[jira] [Commented] (FLINK-19133) User provided kafka partitioners are not initialized correctly
[ https://issues.apache.org/jira/browse/FLINK-19133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190573#comment-17190573 ] Danny Chen commented on FLINK-19133: Hi, [~dwysakowicz], may i take this issue ~ > User provided kafka partitioners are not initialized correctly > -- > > Key: FLINK-19133 > URL: https://issues.apache.org/jira/browse/FLINK-19133 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.11.1 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Blocker > Fix For: 1.12.0, 1.11.2 > > > Reported in the ML: > https://lists.apache.org/thread.html/r94275a7314d44154eb1ac16237906e0f097e8a9d8a5a937e8dcb5e85%40%3Cdev.flink.apache.org%3E > If a user provides a partitioner in combination with SerializationSchema it > is not initialized correctly and has no access to the parallel instance index > or number of parallel instances. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] klion26 commented on a change in pull request #13279: [FLINK-19090][docs-zh] Translate "Local Cluster" of "Clusters & Depolyment" page into Chinese
klion26 commented on a change in pull request #13279: URL: https://github.com/apache/flink/pull/13279#discussion_r483402111 ## File path: docs/ops/deployment/local.zh.md ## @@ -122,38 +121,38 @@ INFO ... - Recovering all persisted jobs. INFO ... - Registering TaskManager ... at ResourceManager {% endhighlight %} - Windows Cygwin Users + Windows Cygwin 用户 -If you are installing Flink from the git repository and you are using the Windows git shell, Cygwin can produce a failure similar to this one: +如果你使用 Windows 的 git shell 从 git 仓库安装 Flink,Cygwin 可能会产生类似如下的错误: {% highlight bash %} c:/flink/bin/start-cluster.sh: line 30: $'\r': command not found {% endhighlight %} -This error occurs because git is automatically transforming UNIX line endings to Windows style line endings when running in Windows. The problem is that Cygwin can only deal with UNIX style line endings. The solution is to adjust the Cygwin settings to deal with the correct line endings by following these three steps: +这个错误是因为当 git 运行在 Windows 上时,它会自动地将 UNIX 的行结束符转换成 Windows 风格的行结束符,但是 Cygwin 只能处理 Unix 风格的行结束符。解决方案是通过以下三步调整 Cygwin 的配置,使其能够正确处理行结束符: -1. Start a Cygwin shell. +1. 启动 Cygwin shell。 -2. Determine your home directory by entering +2. 输入以下命令进入你的家目录 {% highlight bash %} cd; pwd {% endhighlight %} -This will return a path under the Cygwin root path. +这个命令会在 Cygwin 的根目录下返回路径地址。 -3. Using NotePad, WordPad or a different text editor open the file `.bash_profile` in the home directory and append the following: (If the file does not exist you will have to create it) +3. 使用 NotePad、WordPad 或其他的文本编辑器在家目录下打开 `.bash_profile` 文件并添加以下内容:(如果这个文件不存在则需要创建) {% highlight bash %} export SHELLOPTS set -o igncr {% endhighlight %} -Save the file and open a new bash shell. +保存文件并打开一个新的 bash shell。 -### Stop a Local Flink Cluster +### 关闭 Flink 本地集群 Review comment: Personally I prefer to add `` tags for all titles. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix
[ https://issues.apache.org/jira/browse/FLINK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-19108. Fix Version/s: (was: 1.12.0) Assignee: Danny Chen Resolution: Fixed master: fb29aa22e29d62ce0bb58befa969e6eb09af release-1.11: f2cc139d99e2ac08182c7203f63f83168a821c44 > Stop expanding the identifiers with scope aliased by the system with 'EXPR$' > prefix > --- > > Key: FLINK-19108 > URL: https://issues.apache.org/jira/browse/FLINK-19108 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.11.2 >Reporter: Danny Chen >Assignee: Danny Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.11.2 > > > For query > {code:sql} > create view tmp_view as > select * from ( > select f0, > row_number() over (partition by f0 order by f0 desc) as rowNum > from source) -- the query would be aliased as "EXPR$1" > where rowNum = 1 > {code} > When validation, the inner query would have alias assigned by the system with > prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query > all with this prefix which is wrong because we do not add the alias to the > inner query anymore. > To solve the problem, skip the expanding of id with "EXPR$" just like how > {{SqlUtil#deriveAliasFromOrdinal}} added it. > This was introduced by FLINK-18750. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi merged pull request #13293: [FLINK-19108][table] Stop expanding the identifiers with scope aliase…
JingsongLi merged pull request #13293: URL: https://github.com/apache/flink/pull/13293 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13325: [FLINK-15974][python] Support to use the Python UDF directly in the Python Table API
flinkbot commented on pull request #13325: URL: https://github.com/apache/flink/pull/13325#issuecomment-686925091 ## CI report: * 26abf1ecf936d0232c6d49c0bdd4fe3beae6252e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai commented on pull request #142: [FLINK-19130] [core] Expose metrics for backpressure
tzulitai commented on pull request #142: URL: https://github.com/apache/flink-statefun/pull/142#issuecomment-686920891 Discussion point: Strictly speaking, the per-operator `inflight-async-ops` metric is redundant, since most metric reporter systems commonly used by Flink (Prometheus / InfluxDB etc.) have a query language for aggregating metrics. Therefore, the per-operator `inflight-async-ops` metric can be aggregated from the per-function ones. @igalshilman what do you think about this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai opened a new pull request #142: [FLINK-19130] [core] Expose metrics for backpressure
tzulitai opened a new pull request #142: URL: https://github.com/apache/flink-statefun/pull/142 This PR adds in total 2 new metrics related to backpressure: - Number of blocked addresses (per function type) - Number of inflight async operations (per function type + per operator) We also rejected to add the following metric, since after some discussion it doesn't seem to add much value: - Number of accumulated records pre-flight in batch per function type. This was not added, with the assumption that users would really only want to care about that some address has reached the maximum request batch size and was being blocked. --- ## Verification I verified this by running the Python Greeter example, with the following modifications to let backpressure happen more easily: - Maximum batch size = 1 - No delay between each generated message to have maximum input rate You can see the following metric charts in the Flink Web UI: ![image](https://user-images.githubusercontent.com/5284370/92202926-a0f19e00-eeb2-11ea-946d-cbae5bb82260.png) --- ## Brief changelog - 78cbc19 Extends the `FunctionTypeMetrics` interface to include the new metrics, and adds a new `FunctionDispatcherMetrics` interface for per-operator metrics. - 6408866 Introduce a scoped-down interface `FunctionTypeMetricsRepository` and let `StatefulFunctionsRepository` extend it. Components that need to access function metrics will be passed this interface. - 0fbb17a preliminary extension to `ObjectContainer` DI utility so that we can share same instance across different object labels. - 6c77ca4 Wire-in the new metrics in `AsyncSink` to expose backpressure metrics. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19130) Expose backpressure metrics / logs for function dispatcher operator
[ https://issues.apache.org/jira/browse/FLINK-19130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19130: --- Labels: pull-request-available (was: ) > Expose backpressure metrics / logs for function dispatcher operator > --- > > Key: FLINK-19130 > URL: https://issues.apache.org/jira/browse/FLINK-19130 > Project: Flink > Issue Type: Task > Components: Stateful Functions >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: pull-request-available > > As of now, there is no visibility on why or how backpressure is applied in > Stateful Functions. > This JIRA attemps to add two metrics as an initial effort of providing more > visibility: > - Total number of addresses that have asked to be blocked > - Total number of inflight pending async operations -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #13325: [FLINK-15974][python] Support to use the Python UDF directly in the Python Table API
flinkbot commented on pull request #13325: URL: https://github.com/apache/flink/pull/13325#issuecomment-686918964 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 26abf1ecf936d0232c6d49c0bdd4fe3beae6252e (Fri Sep 04 05:34:06 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu opened a new pull request #13325: [FLINK-15974][python] Support to use the Python UDF directly in the Python Table API
dianfu opened a new pull request #13325: URL: https://github.com/apache/flink/pull/13325 ## What is the purpose of the change *This pull request add support to use the Python UDF directly in the Python Table API. E.g. for Python UDF inc, users could use it directly in the Python Table API: tab.select(inc(tab.a)).insert_into("sink")* ## Verifying this change This change is already covered by the updated existing tests test_udf.py, test_pandas_udf.py, etc.*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (Will add documentation in a separate PR) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-15974) Support to use the Python UDF directly in the Python Table API
[ https://issues.apache.org/jira/browse/FLINK-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-15974: --- Labels: pull-request-available (was: ) > Support to use the Python UDF directly in the Python Table API > -- > > Key: FLINK-15974 > URL: https://issues.apache.org/jira/browse/FLINK-15974 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > Currently, a Python UDF has been registered before using in Python Table API, > e.g. > {code} > t_env.register_function("inc", inc) > table.select("inc(id)") \ > .insert_into("sink") > {code} > It would be great if we could support to use Python UDF directly in the > Python Table API, e.g. > {code} > table.select(inc("id")) \ > .insert_into("sink") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-15974) Support to use the Python UDF directly in the Python Table API
[ https://issues.apache.org/jira/browse/FLINK-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-15974: --- Assignee: Dian Fu (was: Shuiqiang Chen) > Support to use the Python UDF directly in the Python Table API > -- > > Key: FLINK-15974 > URL: https://issues.apache.org/jira/browse/FLINK-15974 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Fix For: 1.12.0 > > > Currently, a Python UDF has been registered before using in Python Table API, > e.g. > {code} > t_env.register_function("inc", inc) > table.select("inc(id)") \ > .insert_into("sink") > {code} > It would be great if we could support to use Python UDF directly in the > Python Table API, e.g. > {code} > table.select(inc("id")) \ > .insert_into("sink") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] klion26 commented on a change in pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese
klion26 commented on a change in pull request #13225: URL: https://github.com/apache/flink/pull/13225#discussion_r483392234 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/Fetch.java ## @@ -0,0 +1,13 @@ +package org.apache.flink.table.api; Review comment: please revert these changes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] klion26 commented on a change in pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese
klion26 commented on a change in pull request #13225: URL: https://github.com/apache/flink/pull/13225#discussion_r483392129 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/SortOperationFactory.java ## @@ -121,9 +121,9 @@ private SortQueryOperation validateAndGetChildSort(QueryOperation child, PostRes previousSort = (SortQueryOperation) createSort(Collections.emptyList(), child, postResolverFactory); Review comment: could you please revert these unrelated changes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13015: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.
flinkbot edited a comment on pull request #13015: URL: https://github.com/apache/flink/pull/13015#issuecomment-665526783 ## CI report: * 8aac0b56736097238b12499b837ed010fcbf7d15 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6186) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13015: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.
flinkbot edited a comment on pull request #13015: URL: https://github.com/apache/flink/pull/13015#issuecomment-665526783 ## CI report: * 820de7186da99bece7f43b5300781d7b3a2dbe41 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5482) * 8aac0b56736097238b12499b837ed010fcbf7d15 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6186) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13322: [FLINK-17709][kubernetes][docs] Support running PyFlink on Kubernetes.
flinkbot edited a comment on pull request #13322: URL: https://github.com/apache/flink/pull/13322#issuecomment-686859976 ## CI report: * ef15d07af7eff785478fb42a3cfb67a7463aeb06 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6183) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13015: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.
flinkbot edited a comment on pull request #13015: URL: https://github.com/apache/flink/pull/13015#issuecomment-665526783 ## CI report: * 820de7186da99bece7f43b5300781d7b3a2dbe41 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5482) * 8aac0b56736097238b12499b837ed010fcbf7d15 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13324: [hotfix][docs] Fix ExecutionEnvironment.scala doc error
flinkbot edited a comment on pull request #13324: URL: https://github.com/apache/flink/pull/13324#issuecomment-686875157 ## CI report: * 04c250f4f16667c122c12e4b793204337d204c59 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6185) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13323: FLINK-19137: Bump Parquet from 1.10.0 to 1.11.1
flinkbot edited a comment on pull request #13323: URL: https://github.com/apache/flink/pull/13323#issuecomment-686869045 ## CI report: * 084ec77b7f7b01dc6314797cb3fa0aa2455432f7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6184) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13324: [hotfix][docs] Fix ExecutionEnvironment.scala doc error
flinkbot commented on pull request #13324: URL: https://github.com/apache/flink/pull/13324#issuecomment-686875157 ## CI report: * 04c250f4f16667c122c12e4b793204337d204c59 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13306: [FLINK-17779][Connectors/ORC]Orc file format support filter push down
JingsongLi commented on a change in pull request #13306: URL: https://github.com/apache/flink/pull/13306#discussion_r483360115 ## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java ## @@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig options) { return orcProperties; } + private boolean isUnaryValid(CallExpression callExpression) { + return callExpression.getChildren().size() == 1 && callExpression.getChildren().get(0) instanceof FieldReferenceExpression; + } + + private boolean isBinaryValid(CallExpression callExpression) { + return callExpression.getChildren().size() == 2 && ((callExpression.getChildren().get(0) instanceof FieldReferenceExpression && callExpression.getChildren().get(1) instanceof ValueLiteralExpression) || + (callExpression.getChildren().get(0) instanceof ValueLiteralExpression && callExpression.getChildren().get(1) instanceof FieldReferenceExpression)); + } + + public OrcSplitReader.Predicate toOrcPredicate(Expression expression) { + if (expression instanceof CallExpression) { + CallExpression callExp = (CallExpression) expression; + FunctionDefinition funcDef = callExp.getFunctionDefinition(); + + if (funcDef == BuiltInFunctionDefinitions.IS_NULL || funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL || funcDef == BuiltInFunctionDefinitions.NOT) { + if (!isUnaryValid(callExp)) { + // not a valid predicate + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.", callExp); + return null; + } + + PredicateLeaf.Type colType = toOrcType(((FieldReferenceExpression) callExp.getChildren().get(0)).getOutputDataType()); + if (colType == null) { + // unsupported type + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", callExp); + return null; + } + + String colName = getColumnName(callExp); + + if (funcDef == BuiltInFunctionDefinitions.IS_NULL) { + return new OrcSplitReader.IsNull(colName, colType); + } else if (funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL) { + return new OrcSplitReader.Not( + new OrcSplitReader.IsNull(colName, colType)); + } else { + OrcSplitReader.Predicate c = toOrcPredicate(callExp.getChildren().get(0)); + if (c == null) { + return null; + } else { + return new OrcSplitReader.Not(c); + } + } + } else if (funcDef == BuiltInFunctionDefinitions.OR) { + if (callExp.getChildren().size() < 2) { + return null; + } + Expression left = callExp.getChildren().get(0); + Expression right = callExp.getChildren().get(1); + + OrcSplitReader.Predicate c1 = toOrcPredicate(left); + OrcSplitReader.Predicate c2 = toOrcPredicate(right); + if (c1 == null || c2 == null) { + return null; + } else { + return new OrcSplitReader.Or(c1, c2); + } + } else { + if (!isBinaryValid(callExp)) { + // not a valid predicate + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.", callExp); + return null; + } + + PredicateLeaf.Type litType = getLiteralType(callExp); + if (litType == null) { + // unsupported literal type + LOG.debug("Unsupported predicate [{}] cannot be pushed into
[GitHub] [flink] JingsongLi commented on a change in pull request #13306: [FLINK-17779][Connectors/ORC]Orc file format support filter push down
JingsongLi commented on a change in pull request #13306: URL: https://github.com/apache/flink/pull/13306#discussion_r483358009 ## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java ## @@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig options) { return orcProperties; } + private boolean isUnaryValid(CallExpression callExpression) { Review comment: Can you move these codes to a class: `OrcFilters` ## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java ## @@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig options) { return orcProperties; } + private boolean isUnaryValid(CallExpression callExpression) { + return callExpression.getChildren().size() == 1 && callExpression.getChildren().get(0) instanceof FieldReferenceExpression; + } + + private boolean isBinaryValid(CallExpression callExpression) { + return callExpression.getChildren().size() == 2 && ((callExpression.getChildren().get(0) instanceof FieldReferenceExpression && callExpression.getChildren().get(1) instanceof ValueLiteralExpression) || + (callExpression.getChildren().get(0) instanceof ValueLiteralExpression && callExpression.getChildren().get(1) instanceof FieldReferenceExpression)); + } + + public OrcSplitReader.Predicate toOrcPredicate(Expression expression) { + if (expression instanceof CallExpression) { + CallExpression callExp = (CallExpression) expression; + FunctionDefinition funcDef = callExp.getFunctionDefinition(); + + if (funcDef == BuiltInFunctionDefinitions.IS_NULL || funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL || funcDef == BuiltInFunctionDefinitions.NOT) { Review comment: Maybe we can have a `static final ImmutableMap> FILTERS`. The function style can make codes better. ## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java ## @@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig options) { return orcProperties; } + private boolean isUnaryValid(CallExpression callExpression) { + return callExpression.getChildren().size() == 1 && callExpression.getChildren().get(0) instanceof FieldReferenceExpression; + } + + private boolean isBinaryValid(CallExpression callExpression) { + return callExpression.getChildren().size() == 2 && ((callExpression.getChildren().get(0) instanceof FieldReferenceExpression && callExpression.getChildren().get(1) instanceof ValueLiteralExpression) || + (callExpression.getChildren().get(0) instanceof ValueLiteralExpression && callExpression.getChildren().get(1) instanceof FieldReferenceExpression)); + } + + public OrcSplitReader.Predicate toOrcPredicate(Expression expression) { + if (expression instanceof CallExpression) { + CallExpression callExp = (CallExpression) expression; + FunctionDefinition funcDef = callExp.getFunctionDefinition(); + + if (funcDef == BuiltInFunctionDefinitions.IS_NULL || funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL || funcDef == BuiltInFunctionDefinitions.NOT) { + if (!isUnaryValid(callExp)) { + // not a valid predicate + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.", callExp); + return null; + } + + PredicateLeaf.Type colType = toOrcType(((FieldReferenceExpression) callExp.getChildren().get(0)).getOutputDataType()); + if (colType == null) { + // unsupported type + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", callExp); + return null; + } + + String colName = getColumnName(callExp); + + if (funcDef == BuiltInFunctionDefinitions.IS_NULL) { + return new OrcSplitReader.IsNull(colName, colType); + } else if (funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL) { + return new OrcSplitReader.Not( + new OrcSplitReader.IsNull(colName, colType)); + } else { +
[jira] [Closed] (FLINK-14087) throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using RebalancePartitioner.
[ https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao closed FLINK-14087. --- > throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using > RebalancePartitioner. > --- > > Key: FLINK-14087 > URL: https://issues.apache.org/jira/browse/FLINK-14087 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Network >Affects Versions: 1.8.0, 1.8.1, 1.9.0 >Reporter: luojiangyu >Assignee: Yun Gao >Priority: Major > Labels: pull-request-available > Attachments: image-2019-09-16-19-14-39-403.png, > image-2019-09-16-19-15-34-639.png > > > There is the condition the RecordWriter sharing the ChannelSelector instance. > When two RecordWriter instance shared the same ChannelSelector Instance , It > may throws > java.lang.ArrayIndexOutOfBoundsException . For example, two recordWriter > instance shared the RebalancePartitioner instance. the RebalancePartitioner > instance setup 2 number of Channels when the first RecordWriter initializing, > next the some RebalancePartitioner instance setup 3 number of channels When > the second RecordWriter initializing. this throws > ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the > data. > The Exception likes > |java.lang.RuntimeException: 2 at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at > java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.ArrayIndexOutOfBoundsException: 2 at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109) > ... 14 more| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14087) throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using RebalancePartitioner.
[ https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190507#comment-17190507 ] Yun Gao commented on FLINK-14087: - fix in master: d7fe9af0b6caa1c76e811240e53434969be771b1 > throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using > RebalancePartitioner. > --- > > Key: FLINK-14087 > URL: https://issues.apache.org/jira/browse/FLINK-14087 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Network >Affects Versions: 1.8.0, 1.8.1, 1.9.0 >Reporter: luojiangyu >Assignee: Yun Gao >Priority: Major > Labels: pull-request-available > Attachments: image-2019-09-16-19-14-39-403.png, > image-2019-09-16-19-15-34-639.png > > > There is the condition the RecordWriter sharing the ChannelSelector instance. > When two RecordWriter instance shared the same ChannelSelector Instance , It > may throws > java.lang.ArrayIndexOutOfBoundsException . For example, two recordWriter > instance shared the RebalancePartitioner instance. the RebalancePartitioner > instance setup 2 number of Channels when the first RecordWriter initializing, > next the some RebalancePartitioner instance setup 3 number of channels When > the second RecordWriter initializing. this throws > ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the > data. > The Exception likes > |java.lang.RuntimeException: 2 at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at > java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.ArrayIndexOutOfBoundsException: 2 at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109) > ... 14 more| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (FLINK-14087) throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using RebalancePartitioner.
[ https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao resolved FLINK-14087. - Resolution: Fixed > throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using > RebalancePartitioner. > --- > > Key: FLINK-14087 > URL: https://issues.apache.org/jira/browse/FLINK-14087 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Network >Affects Versions: 1.8.0, 1.8.1, 1.9.0 >Reporter: luojiangyu >Assignee: Yun Gao >Priority: Major > Labels: pull-request-available > Attachments: image-2019-09-16-19-14-39-403.png, > image-2019-09-16-19-15-34-639.png > > > There is the condition the RecordWriter sharing the ChannelSelector instance. > When two RecordWriter instance shared the same ChannelSelector Instance , It > may throws > java.lang.ArrayIndexOutOfBoundsException . For example, two recordWriter > instance shared the RebalancePartitioner instance. the RebalancePartitioner > instance setup 2 number of Channels when the first RecordWriter initializing, > next the some RebalancePartitioner instance setup 3 number of channels When > the second RecordWriter initializing. this throws > ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the > data. > The Exception likes > |java.lang.RuntimeException: 2 at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at > java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.ArrayIndexOutOfBoundsException: 2 at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109) > ... 14 more| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #13324: [hotfix][docs] Fix ExecutionEnvironment.scala doc error
flinkbot commented on pull request #13324: URL: https://github.com/apache/flink/pull/13324#issuecomment-686872257 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 04c250f4f16667c122c12e4b793204337d204c59 (Fri Sep 04 03:02:12 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19127) Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment for TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-19127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190505#comment-17190505 ] Danny Chen commented on FLINK-19127: Thanks, indeed, notebook like zeppelin may need this ~ > Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment > for TableEnvironment > > > Key: FLINK-19127 > URL: https://issues.apache.org/jira/browse/FLINK-19127 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Timo Walther >Priority: Major > > Connecting to a remote cluster from the unified TableEnvironment is neither > tested nor documented. Since StreamExecutionEnvironment is not necessary > anymore, users should be able to do the same in TableEnvironment easily. This > is in particular useful for interactive sessions that run in an IDE. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] haveanote opened a new pull request #13324: [hotfix][docs] Fix ExecutionEnvironment.scala doc error
haveanote opened a new pull request #13324: URL: https://github.com/apache/flink/pull/13324 ## What is the purpose of the change *There is a doc error in ExecutionEnvironment.readTextFileWithValue . Change utf-0 to utf-8* ## Brief change log - *Change utf-0 to utf-8* - *line 236 @param charsetName The name of the character set used to read the file. Default is UTF-8* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-19138) Python UDF supports directly specifying input_types as DataTypes.ROW
Huang Xingbo created FLINK-19138: Summary: Python UDF supports directly specifying input_types as DataTypes.ROW Key: FLINK-19138 URL: https://issues.apache.org/jira/browse/FLINK-19138 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Huang Xingbo Fix For: 1.12.0 Python UDF supports input_types=DataTypes.ROW -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #13323: FLINK-19137: Bump Parquet from 1.10.0 to 1.11.1
flinkbot commented on pull request #13323: URL: https://github.com/apache/flink/pull/13323#issuecomment-686869045 ## CI report: * 084ec77b7f7b01dc6314797cb3fa0aa2455432f7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13323: FLINK-19137: Bump Parquet from 1.10.0 to 1.11.1
flinkbot commented on pull request #13323: URL: https://github.com/apache/flink/pull/13323#issuecomment-686867076 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 084ec77b7f7b01dc6314797cb3fa0aa2455432f7 (Fri Sep 04 02:43:36 UTC 2020) **Warnings:** * **1 pom.xml files were touched**: Check for build and licensing issues. * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-19137).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19137) Bump Apache Parquet to 1.11.1
[ https://issues.apache.org/jira/browse/FLINK-19137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190497#comment-17190497 ] ABC commented on FLINK-19137: - [https://github.com/apache/flink/pull/13323] > Bump Apache Parquet to 1.11.1 > - > > Key: FLINK-19137 > URL: https://issues.apache.org/jira/browse/FLINK-19137 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: ABC >Priority: Major > Labels: pull-request-available > Attachments: image-2020-09-04-10-21-00-688.png, > image-2020-09-04-10-24-42-480.png > > > Apache Parquet 1.11.1 fixed some important issues: > * https://issues.apache.org/jira/browse/PARQUET-1309 > * https://issues.apache.org/jira/browse/PARQUET-1510 > * https://issues.apache.org/jira/browse/PARQUET-1485 > Now Flink master branch relies parquet 1.10.0, and flink-sql-parquet artifact > shaded parquet class files into flink-sql-parquet.jar. So this may lead to > direct memory leak in PARQUET-1485 or parquet properties bug in PARQUET-1309 > or repeat values with dictionary encoding error in PARQUET-1510. > > For example in PARQUET-1309: > !image-2020-09-04-10-21-00-688.png! > then in Flink: > [https://github.com/C08061/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java#L166] > !image-2020-09-04-10-24-42-480.png! > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] C08061 opened a new pull request #13323: FLINK-19137: Bump Parquet from 1.10.0 to 1.11.1
C08061 opened a new pull request #13323: URL: https://github.com/apache/flink/pull/13323 https://issues.apache.org/jira/browse/FLINK-19137 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19137) Bump Apache Parquet to 1.11.1
[ https://issues.apache.org/jira/browse/FLINK-19137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19137: --- Labels: pull-request-available (was: ) > Bump Apache Parquet to 1.11.1 > - > > Key: FLINK-19137 > URL: https://issues.apache.org/jira/browse/FLINK-19137 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: ABC >Priority: Major > Labels: pull-request-available > Attachments: image-2020-09-04-10-21-00-688.png, > image-2020-09-04-10-24-42-480.png > > > Apache Parquet 1.11.1 fixed some important issues: > * https://issues.apache.org/jira/browse/PARQUET-1309 > * https://issues.apache.org/jira/browse/PARQUET-1510 > * https://issues.apache.org/jira/browse/PARQUET-1485 > Now Flink master branch relies parquet 1.10.0, and flink-sql-parquet artifact > shaded parquet class files into flink-sql-parquet.jar. So this may lead to > direct memory leak in PARQUET-1485 or parquet properties bug in PARQUET-1309 > or repeat values with dictionary encoding error in PARQUET-1510. > > For example in PARQUET-1309: > !image-2020-09-04-10-21-00-688.png! > then in Flink: > [https://github.com/C08061/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java#L166] > !image-2020-09-04-10-24-42-480.png! > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13322: [FLINK-17709][kubernetes][docs] Support running PyFlink on Kubernetes.
flinkbot edited a comment on pull request #13322: URL: https://github.com/apache/flink/pull/13322#issuecomment-686859976 ## CI report: * ef15d07af7eff785478fb42a3cfb67a7463aeb06 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6183) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-19137) Bump Apache Parquet to 1.11.1
ABC created FLINK-19137: --- Summary: Bump Apache Parquet to 1.11.1 Key: FLINK-19137 URL: https://issues.apache.org/jira/browse/FLINK-19137 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: ABC Attachments: image-2020-09-04-10-21-00-688.png, image-2020-09-04-10-24-42-480.png Apache Parquet 1.11.1 fixed some important issues: * https://issues.apache.org/jira/browse/PARQUET-1309 * https://issues.apache.org/jira/browse/PARQUET-1510 * https://issues.apache.org/jira/browse/PARQUET-1485 Now Flink master branch relies parquet 1.10.0, and flink-sql-parquet artifact shaded parquet class files into flink-sql-parquet.jar. So this may lead to direct memory leak in PARQUET-1485 or parquet properties bug in PARQUET-1309 or repeat values with dictionary encoding error in PARQUET-1510. For example in PARQUET-1309: !image-2020-09-04-10-21-00-688.png! then in Flink: [https://github.com/C08061/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java#L166] !image-2020-09-04-10-24-42-480.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink-statefun] tzulitai closed pull request #141: [FLINK-19129] [k8s] Update log4j-console in template Helm chart
tzulitai closed pull request #141: URL: https://github.com/apache/flink-statefun/pull/141 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lirui-apache commented on pull request #13315: [FLINK-19070][hive] Hive connector should throw a meaningful exception if user reads/writes ACID tables
lirui-apache commented on pull request #13315: URL: https://github.com/apache/flink/pull/13315#issuecomment-686860019 @SteNicholas Thanks for working on this. I think the check should be enforced when getting source/sink for a hive table, rather than creating the hive table itself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13322: [FLINK-17709][kubernetes][docs] Support running PyFlink on Kubernetes.
flinkbot commented on pull request #13322: URL: https://github.com/apache/flink/pull/13322#issuecomment-686859976 ## CI report: * ef15d07af7eff785478fb42a3cfb67a7463aeb06 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tzulitai commented on a change in pull request #13015: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.
tzulitai commented on a change in pull request #13015: URL: https://github.com/apache/flink/pull/13015#discussion_r483345056 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java ## @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout; + +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * This is a configuration class for enhanced fan-out components. + */ +public class FanOutRecordPublisherConfiguration { + + /** +* The efo registration type for de-/registration of streams. +*/ + private final EFORegistrationType efoRegistrationType; + + /** +* The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER. +*/ + @Nullable + private String consumerName; + + /** +* The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE +*/ + @Nullable + private Map streamConsumerArns; + + /** +* Base backoff millis for the deregister stream operation. +*/ + private final int subscribeToShardMaxRetries; + + /** +* Maximum backoff millis for the subscribe to shard operation. +*/ + private final long subscribeToShardMaxBackoffMillis; + + /** +* Base backoff millis for the subscribe to shard operation. +*/ + private final long subscribeToShardBaseBackoffMillis; + + /** +* Exponential backoff power constant for the subscribe to shard operation. +*/ + private final double subscribeToShardExpConstant; + + /** +* Base backoff millis for the register stream operation. +*/ + private final long registerStreamBaseBackoffMillis; + + /** +* Maximum backoff millis for the register stream operation. +*/ + private final long registerStreamMaxBackoffMillis; + + /** +* Exponential backoff power constant for the register stream operation. +*/ + private final double registerStreamExpConstant; + + /** +* Maximum retry attempts for the register stream operation. +*/ + private final int registerStreamMaxRetries; + + /** +* Base backoff millis for the deregister stream operation. +*/ + private final long deregisterStreamBaseBackoffMillis; + + /** +* Maximum backoff millis for the deregister stream operation. +*/ + private final long deregisterStreamMaxBackoffMillis; + + /** +* Exponential backoff power constant for the deregister stream operation. +*/ + private final double deregisterStreamExpConstant; + + /** +* Maximum retry attempts for the deregister stream operation. +*/ + private final int deregisterStreamMaxRetries; + + /** +* Max retries for the describe stream operation. +*/ + private final int describeStreamMaxRetries; + + /** +* Backoff millis for the describe stream operation. +*/ + private final long describeStreamBaseBackoffMillis; + + /** +* Maximum backoff millis for the describe stream operation. +*/ + private final long describeStreamMaxBackoffMillis; + + /** +* Exponential backoff power constant for the describe stream operation. +*/ + private final double
[GitHub] [flink] flinkbot commented on pull request #13322: [FLINK-17709][kubernetes][docs] Support running PyFlink on Kubernetes.
flinkbot commented on pull request #13322: URL: https://github.com/apache/flink/pull/13322#issuecomment-686856023 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit ef15d07af7eff785478fb42a3cfb67a7463aeb06 (Fri Sep 04 02:03:26 UTC 2020) **Warnings:** * Documentation files were touched, but no `.zh.md` files: Update Chinese documentation or file Jira ticket. * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-17709).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-17709) Active Kubernetes integration phase 3 - Advanced Features
[ https://issues.apache.org/jira/browse/FLINK-17709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-17709: --- Labels: pull-request-available (was: ) > Active Kubernetes integration phase 3 - Advanced Features > - > > Key: FLINK-17709 > URL: https://issues.apache.org/jira/browse/FLINK-17709 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes, Runtime / Coordination >Reporter: Canbin Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > This is the umbrella issue to track all the advanced features for phase 3 of > active Kubernetes integration in Flink 1.12.0. Some of the features are as > follows: > # Support multiple JobManagers in ZooKeeper based HA setups. > # Support user-specified pod templates. > # Support FileSystem based high availability. > # Support running PyFlink. > # Support accessing secured services via K8s secrets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] shuiqiangchen opened a new pull request #13322: [FLINK-17709][kubernetes][docs] Support running PyFlink on Kubernetes.
shuiqiangchen opened a new pull request #13322: URL: https://github.com/apache/flink/pull/13322 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18858) Kinesis Flink SQL Connector
[ https://issues.apache.org/jira/browse/FLINK-18858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190475#comment-17190475 ] Tzu-Li (Gordon) Tai commented on FLINK-18858: - [~danny.cranmer]I think a FLIP is not needed for the feature, since the addition should be fairly self-contained and does not affect much (if any) of the existing codebase. > Kinesis Flink SQL Connector > --- > > Key: FLINK-18858 > URL: https://issues.apache.org/jira/browse/FLINK-18858 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis, Table SQL / Ecosystem >Reporter: Waldemar Hummer >Priority: Major > > Hi all, > as far as I can see in the [list of > connectors|https://github.com/apache/flink/tree/master/flink-connectors], we > have a > {{[flink-connector-kinesis|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis]}} > for *programmatic access* to Kinesis streams, but there does not yet seem to > exist a *Kinesis SQL connector* (something like > {{flink-sql-connector-kinesis}}, analogous to {{flink-sql-connector-kafka}}). > Our use case would be to enable SQL queries with direct access to Kinesis > sources (and potentially sinks), to enable something like the following Flink > SQL queries: > {code:java} > $ bin/sql-client.sh embedded > ... > Flink SQL> CREATE TABLE Orders(`user` string, amount int, rowtime TIME) WITH > ('connector' = 'kinesis', ...); > ... > Flink SQL> SELECT * FROM Orders ...; > ...{code} > > I was wondering if this is something that has been considered, or is already > actively being worked on? If one of you can provide some guidance, we may be > able to work on a PoC implementation to add this functionality. > > (Wasn't able to find an existing issue in the backlog - if this is a > duplicate, then please let me know as well.) > Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19136) MetricsAvailabilityITCase.testReporter failed with "Could not satisfy the predicate within the allowed time"
Dian Fu created FLINK-19136: --- Summary: MetricsAvailabilityITCase.testReporter failed with "Could not satisfy the predicate within the allowed time" Key: FLINK-19136 URL: https://issues.apache.org/jira/browse/FLINK-19136 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.12.0 Reporter: Dian Fu [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6179=logs=9fca669f-5c5f-59c7-4118-e31c641064f0=91bf6583-3fb2-592f-e4d4-d79d79c3230a] {code} 2020-09-03T23:33:18.3687261Z [ERROR] testReporter(org.apache.flink.metrics.tests.MetricsAvailabilityITCase) Time elapsed: 15.217 s <<< ERROR! 2020-09-03T23:33:18.3698260Z java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not satisfy the predicate within the allowed time. 2020-09-03T23:33:18.3698749Zat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 2020-09-03T23:33:18.3699163Zat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) 2020-09-03T23:33:18.3699754Zat org.apache.flink.metrics.tests.MetricsAvailabilityITCase.fetchMetric(MetricsAvailabilityITCase.java:162) 2020-09-03T23:33:18.3700234Zat org.apache.flink.metrics.tests.MetricsAvailabilityITCase.checkJobManagerMetricAvailability(MetricsAvailabilityITCase.java:116) 2020-09-03T23:33:18.3700726Zat org.apache.flink.metrics.tests.MetricsAvailabilityITCase.testReporter(MetricsAvailabilityITCase.java:101) 2020-09-03T23:33:18.3701097Zat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2020-09-03T23:33:18.3701425Zat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2020-09-03T23:33:18.3701798Zat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2020-09-03T23:33:18.3702146Zat java.lang.reflect.Method.invoke(Method.java:498) 2020-09-03T23:33:18.3702471Zat org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 2020-09-03T23:33:18.3702866Zat org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2020-09-03T23:33:18.3703253Zat org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 2020-09-03T23:33:18.3703621Zat org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2020-09-03T23:33:18.3703997Zat org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) 2020-09-03T23:33:18.3704339Zat org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) 2020-09-03T23:33:18.3704629Zat org.junit.rules.RunRules.evaluate(RunRules.java:20) 2020-09-03T23:33:18.3704940Zat org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 2020-09-03T23:33:18.3705354Zat org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 2020-09-03T23:33:18.3705725Zat org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 2020-09-03T23:33:18.3706072Zat org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 2020-09-03T23:33:18.3706397Zat org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 2020-09-03T23:33:18.3706714Zat org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 2020-09-03T23:33:18.3707044Zat org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 2020-09-03T23:33:18.3707373Zat org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 2020-09-03T23:33:18.3707708Zat org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 2020-09-03T23:33:18.3708073Zat org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 2020-09-03T23:33:18.3708410Zat org.junit.runners.ParentRunner.run(ParentRunner.java:363) 2020-09-03T23:33:18.3708691Zat org.junit.runners.Suite.runChild(Suite.java:128) 2020-09-03T23:33:18.3708976Zat org.junit.runners.Suite.runChild(Suite.java:27) 2020-09-03T23:33:18.3709273Zat org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 2020-09-03T23:33:18.3709579Zat org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 2020-09-03T23:33:18.3709910Zat org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 2020-09-03T23:33:18.3710242Zat org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 2020-09-03T23:33:18.3710554Zat org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 2020-09-03T23:33:18.3710875Zat org.junit.runners.ParentRunner.run(ParentRunner.java:363) 2020-09-03T23:33:18.3711203Zat org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55) 2020-09-03T23:33:18.3711585Zat org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
[jira] [Updated] (FLINK-19136) MetricsAvailabilityITCase.testReporter failed with "Could not satisfy the predicate within the allowed time"
[ https://issues.apache.org/jira/browse/FLINK-19136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-19136: Labels: test-stability (was: ) > MetricsAvailabilityITCase.testReporter failed with "Could not satisfy the > predicate within the allowed time" > > > Key: FLINK-19136 > URL: https://issues.apache.org/jira/browse/FLINK-19136 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6179=logs=9fca669f-5c5f-59c7-4118-e31c641064f0=91bf6583-3fb2-592f-e4d4-d79d79c3230a] > {code} > 2020-09-03T23:33:18.3687261Z [ERROR] > testReporter(org.apache.flink.metrics.tests.MetricsAvailabilityITCase) Time > elapsed: 15.217 s <<< ERROR! > 2020-09-03T23:33:18.3698260Z java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > satisfy the predicate within the allowed time. > 2020-09-03T23:33:18.3698749Z at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > 2020-09-03T23:33:18.3699163Z at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) > 2020-09-03T23:33:18.3699754Z at > org.apache.flink.metrics.tests.MetricsAvailabilityITCase.fetchMetric(MetricsAvailabilityITCase.java:162) > 2020-09-03T23:33:18.3700234Z at > org.apache.flink.metrics.tests.MetricsAvailabilityITCase.checkJobManagerMetricAvailability(MetricsAvailabilityITCase.java:116) > 2020-09-03T23:33:18.3700726Z at > org.apache.flink.metrics.tests.MetricsAvailabilityITCase.testReporter(MetricsAvailabilityITCase.java:101) > 2020-09-03T23:33:18.3701097Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2020-09-03T23:33:18.3701425Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2020-09-03T23:33:18.3701798Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-09-03T23:33:18.3702146Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2020-09-03T23:33:18.3702471Z at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2020-09-03T23:33:18.3702866Z at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2020-09-03T23:33:18.3703253Z at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2020-09-03T23:33:18.3703621Z at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2020-09-03T23:33:18.3703997Z at > org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) > 2020-09-03T23:33:18.3704339Z at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > 2020-09-03T23:33:18.3704629Z at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2020-09-03T23:33:18.3704940Z at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > 2020-09-03T23:33:18.3705354Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > 2020-09-03T23:33:18.3705725Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > 2020-09-03T23:33:18.3706072Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2020-09-03T23:33:18.3706397Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2020-09-03T23:33:18.3706714Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2020-09-03T23:33:18.3707044Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2020-09-03T23:33:18.3707373Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2020-09-03T23:33:18.3707708Z at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2020-09-03T23:33:18.3708073Z at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2020-09-03T23:33:18.3708410Z at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > 2020-09-03T23:33:18.3708691Z at > org.junit.runners.Suite.runChild(Suite.java:128) > 2020-09-03T23:33:18.3708976Z at > org.junit.runners.Suite.runChild(Suite.java:27) > 2020-09-03T23:33:18.3709273Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2020-09-03T23:33:18.3709579Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2020-09-03T23:33:18.3709910Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2020-09-03T23:33:18.3710242Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2020-09-03T23:33:18.3710554Z at >
[jira] [Commented] (FLINK-18815) AbstractCloseableRegistryTest.testClose unstable
[ https://issues.apache.org/jira/browse/FLINK-18815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190472#comment-17190472 ] Dian Fu commented on FLINK-18815: - Another instance: [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6180=logs=3b6ec2fd-a816-5e75-c775-06fb87cb6670=b33fdd4f-3de5-542e-2624-5d53167bb672] > AbstractCloseableRegistryTest.testClose unstable > > > Key: FLINK-18815 > URL: https://issues.apache.org/jira/browse/FLINK-18815 > Project: Flink > Issue Type: Bug > Components: FileSystems, Tests >Affects Versions: 1.10.1, 1.12.0, 1.11.1 >Reporter: Robert Metzger >Assignee: Kezhu Wang >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.10.2, 1.12.0, 1.11.2 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=5164=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0 > {code} > [ERROR] Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.509 > s <<< FAILURE! - in org.apache.flink.core.fs.SafetyNetCloseableRegistryTest > [ERROR] testClose(org.apache.flink.core.fs.SafetyNetCloseableRegistryTest) > Time elapsed: 1.15 s <<< FAILURE! > java.lang.AssertionError: expected:<0> but was:<-1> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > org.apache.flink.core.fs.AbstractCloseableRegistryTest.testClose(AbstractCloseableRegistryTest.java:93) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
flinkbot edited a comment on pull request #13309: URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869 ## CI report: * 1d254fbbfe4ee9aa6c05bf9fcb44af3317c9274b UNKNOWN * 0ac70b7637256b8b4320e509c0649075c7c0aabf Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6181) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-19118) Support Expression in the operations of the Python Table API
[ https://issues.apache.org/jira/browse/FLINK-19118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-19118. --- Resolution: Fixed Fixed in Master: a8cc62a901dabe6c4d877b97db6024715b68174a > Support Expression in the operations of the Python Table API > > > Key: FLINK-19118 > URL: https://issues.apache.org/jira/browse/FLINK-19118 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > Currently, it only supports string in the operations of the Python Table API. > For example: > {code} > >>> tab.group_by("key").select("key, value.avg") > {code} > After introducing the Expression class in FLINK-19114, it's possible to > support > to use Expression in the operations in the Python Table API, e.g. > {code} > >>> tab.group_by(col("key")).select(col("key"), col("value").avg) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] sunjincheng121 merged pull request #13304: [FLINK-19118][python] Support Expression in the operations of Python Table API
sunjincheng121 merged pull request #13304: URL: https://github.com/apache/flink/pull/13304 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sunjincheng121 commented on pull request #13304: [FLINK-19118][python] Support Expression in the operations of Python Table API
sunjincheng121 commented on pull request #13304: URL: https://github.com/apache/flink/pull/13304#issuecomment-686839232 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19069) finalizeOnMaster takes too much time and client timeouts
[ https://issues.apache.org/jira/browse/FLINK-19069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190447#comment-17190447 ] Kenneth William Krugler commented on FLINK-19069: - See also [https://kb.databricks.com/data/append-slow-with-spark-2.0.0.html] > finalizeOnMaster takes too much time and client timeouts > > > Key: FLINK-19069 > URL: https://issues.apache.org/jira/browse/FLINK-19069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0 >Reporter: Jiayi Liao >Priority: Critical > Fix For: 1.12.0, 1.11.2, 1.10.3 > > > Currently we execute {{finalizeOnMaster}} in JM's main thread, which may > stuck the JM for a very long time and client timeouts eventually. > For example, we'd like to write data to HDFS and commit files on JM, which > takes more than ten minutes to commit tens of thousands files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
flinkbot edited a comment on pull request #13309: URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869 ## CI report: * 7f8d6066b5e021ff8a343e2c43bd40d256d1a2c9 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6178) * 1d254fbbfe4ee9aa6c05bf9fcb44af3317c9274b UNKNOWN * 0ac70b7637256b8b4320e509c0649075c7c0aabf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6181) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
sjwiesman commented on pull request #13309: URL: https://github.com/apache/flink/pull/13309#issuecomment-686819620 You can apply this patch to SavepointWriterITCase to see the failure. [SavepointWriterITCase.patch.txt](https://github.com/apache/flink/files/5171892/SavepointWriterITCase.patch.txt) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
sjwiesman commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483299944 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyMapFunction.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; Review comment: This package is for user interfaces. This is an internal class and should be in the `output` package. Same with the other file. ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.util.Collector; + +/** + * Extracts all file paths that are part of the provided {@link OperatorState}. + */ +public class StatePathExtractor implements FlatMapFunction { Review comment: Mark as @Internal ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyMapFunction.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * This mapper copies files from an existing savepoint into a new directory. + */ +public final class FileCopyMapFunction implements MapFunction { Review comment: Internal class with public scope should be marked @Internal ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyMapFunction.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding
[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
flinkbot edited a comment on pull request #13309: URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869 ## CI report: * fd94dc00ac4095094d0a3c84bd066b44ddd346a7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6150) * 7f8d6066b5e021ff8a343e2c43bd40d256d1a2c9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6178) * 1d254fbbfe4ee9aa6c05bf9fcb44af3317c9274b UNKNOWN * 0ac70b7637256b8b4320e509c0649075c7c0aabf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6181) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
flinkbot edited a comment on pull request #13309: URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869 ## CI report: * fd94dc00ac4095094d0a3c84bd066b44ddd346a7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6150) * 7f8d6066b5e021ff8a343e2c43bd40d256d1a2c9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6178) * 1d254fbbfe4ee9aa6c05bf9fcb44af3317c9274b UNKNOWN * 0ac70b7637256b8b4320e509c0649075c7c0aabf UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
flinkbot edited a comment on pull request #13309: URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869 ## CI report: * fd94dc00ac4095094d0a3c84bd066b44ddd346a7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6150) * 7f8d6066b5e021ff8a343e2c43bd40d256d1a2c9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6178) * 1d254fbbfe4ee9aa6c05bf9fcb44af3317c9274b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13181: [FLINK-18957] Implement logical request bulk tracking in SlotSharingExecutionSlotAllocator
flinkbot edited a comment on pull request #13181: URL: https://github.com/apache/flink/pull/13181#issuecomment-675091412 ## CI report: * e7553689356882de1ffe606400d1255d1d757bc4 UNKNOWN * a96b2db52a0db507e0077266c8e9cb947413e1ba Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6176) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
qinjunjerry commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483220192 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; + +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * A {@link RichMapFunction} for copying files. + */ +public class FileCopyRichMapFunction extends RichMapFunction { + // the destination path to copy file + private String path; + + public FileCopyRichMapFunction(String path) { + this.path = path; + } + + @Override + public void open(Configuration configuration) throws Exception { + // create the parent dir only in the first subtask + if (getRuntimeContext().getIndexOfThisSubtask() == 0) { Review comment: Changed to `MapFunction`, as we do not guarantee all `open()` are executed before all `map()` in all subtasks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
qinjunjerry commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483216756 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java ## @@ -90,38 +93,37 @@ public final void write(String path) { List existingOperators = metadata.getExistingOperators(); - DataSet finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators); - - finalOperatorStates - .reduceGroup(new MergeOperatorStates(metadata.getMasterStates())) - .name("reduce(OperatorState)") - .output(new SavepointOutputFormat(savepointPath)) - .name(path); - } - - private DataSet unionOperatorStates(DataSet newOperatorStates, List existingOperators) { DataSet finalOperatorStates; if (existingOperators.isEmpty()) { finalOperatorStates = newOperatorStates; } else { - DataSet wrappedCollection = newOperatorStates - .getExecutionEnvironment() + DataSet existingOperatorStates = newOperatorStates.getExecutionEnvironment() .fromCollection(existingOperators); - finalOperatorStates = newOperatorStates.union(wrappedCollection); + existingOperatorStates + .flatMap(new StatePathExtractor()) Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
qinjunjerry commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483214542 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java ## @@ -90,38 +93,37 @@ public final void write(String path) { List existingOperators = metadata.getExistingOperators(); - DataSet finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators); - - finalOperatorStates - .reduceGroup(new MergeOperatorStates(metadata.getMasterStates())) - .name("reduce(OperatorState)") - .output(new SavepointOutputFormat(savepointPath)) - .name(path); - } - - private DataSet unionOperatorStates(DataSet newOperatorStates, List existingOperators) { DataSet finalOperatorStates; if (existingOperators.isEmpty()) { finalOperatorStates = newOperatorStates; } else { - DataSet wrappedCollection = newOperatorStates - .getExecutionEnvironment() + DataSet existingOperatorStates = newOperatorStates.getExecutionEnvironment() .fromCollection(existingOperators); - finalOperatorStates = newOperatorStates.union(wrappedCollection); + existingOperatorStates + .flatMap(new StatePathExtractor()) Review comment: I misunderstood. It is for `.flatMap(new StatePathExtractor())`, not file copy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
qinjunjerry commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483212926 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; + +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * A {@link RichMapFunction} for copying files. + */ +public final class FileCopyRichMapFunction extends RichMapFunction { + // the destination path to copy file + private final String path; + + public FileCopyRichMapFunction(String path) { + this.path = path; Review comment: Good point! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13284: [FLINK-17016][runtime] Enable pipelined region scheduling
flinkbot edited a comment on pull request #13284: URL: https://github.com/apache/flink/pull/13284#issuecomment-683683000 ## CI report: * 71281ac4921c174c214f2393e169e7140698af2d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6177) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
flinkbot edited a comment on pull request #13309: URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869 ## CI report: * fd94dc00ac4095094d0a3c84bd066b44ddd346a7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6150) * 7f8d6066b5e021ff8a343e2c43bd40d256d1a2c9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
qinjunjerry commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483212773 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.util.Collector; + +/** + * Extract from an OperatorState a set of state file paths. + */ +public class StatePathExtractor implements FlatMapFunction { Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
qinjunjerry commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483211627 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; + +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * A {@link RichMapFunction} for copying files. + */ Review comment: Looks better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
qinjunjerry commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483211813 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.util.Collector; + +/** + * Extract from an OperatorState a set of state file paths. Review comment: Looks better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
qinjunjerry commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r48328 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java ## @@ -90,38 +93,37 @@ public final void write(String path) { List existingOperators = metadata.getExistingOperators(); - DataSet finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators); - - finalOperatorStates - .reduceGroup(new MergeOperatorStates(metadata.getMasterStates())) - .name("reduce(OperatorState)") - .output(new SavepointOutputFormat(savepointPath)) - .name(path); - } - - private DataSet unionOperatorStates(DataSet newOperatorStates, List existingOperators) { DataSet finalOperatorStates; if (existingOperators.isEmpty()) { finalOperatorStates = newOperatorStates; } else { - DataSet wrappedCollection = newOperatorStates - .getExecutionEnvironment() + DataSet existingOperatorStates = newOperatorStates.getExecutionEnvironment() .fromCollection(existingOperators); - finalOperatorStates = newOperatorStates.union(wrappedCollection); + existingOperatorStates + .flatMap(new StatePathExtractor()) Review comment: If we have a huge savepoint (e.g., in TB range) to process, do we also do with parallelism=1? meaning only one thread does the file copy, potentially many of them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
qinjunjerry commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483211312 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; + +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * A {@link RichMapFunction} for copying files. + */ +public final class FileCopyRichMapFunction extends RichMapFunction { Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13321: [FLINK-14870] Remove nullable assumption of task slot sharing group
flinkbot edited a comment on pull request #13321: URL: https://github.com/apache/flink/pull/13321#issuecomment-686567896 ## CI report: * d79f152fa91b8bc555af6fb2a00a8d62b184be5a UNKNOWN * 7039064922aaec22752ac84e4e9d41d663e68a14 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6175) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on pull request #13179: [FLINK-18978][state-backends] Support full table scan of key and namespace from statebackend
sjwiesman commented on pull request #13179: URL: https://github.com/apache/flink/pull/13179#issuecomment-686698188 @myasuka please take another look when you have the time This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18858) Kinesis Flink SQL Connector
[ https://issues.apache.org/jira/browse/FLINK-18858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190366#comment-17190366 ] Danny Cranmer commented on FLINK-18858: --- Thanks [~tzulitai]. Hello [~whummer], I am a member of the Kinesis team at AWS. It is great to see interest in SQL support for Kinesis streams. I would be happy to collaborate with you on this feature. Have you made any progress on your PoC so far? It would be good to arrange a call to discuss the way forward, feel free to drop me an email at cranm...@amazon.com. I look forward to hearing back! [~tzulitai]/[~rmetzger] would you expect a FLIP for this, or is the Jira sufficient? > Kinesis Flink SQL Connector > --- > > Key: FLINK-18858 > URL: https://issues.apache.org/jira/browse/FLINK-18858 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis, Table SQL / Ecosystem >Reporter: Waldemar Hummer >Priority: Major > > Hi all, > as far as I can see in the [list of > connectors|https://github.com/apache/flink/tree/master/flink-connectors], we > have a > {{[flink-connector-kinesis|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis]}} > for *programmatic access* to Kinesis streams, but there does not yet seem to > exist a *Kinesis SQL connector* (something like > {{flink-sql-connector-kinesis}}, analogous to {{flink-sql-connector-kafka}}). > Our use case would be to enable SQL queries with direct access to Kinesis > sources (and potentially sinks), to enable something like the following Flink > SQL queries: > {code:java} > $ bin/sql-client.sh embedded > ... > Flink SQL> CREATE TABLE Orders(`user` string, amount int, rowtime TIME) WITH > ('connector' = 'kinesis', ...); > ... > Flink SQL> SELECT * FROM Orders ...; > ...{code} > > I was wondering if this is something that has been considered, or is already > actively being worked on? If one of you can provide some guidance, we may be > able to work on a PoC implementation to add this functionality. > > (Wasn't able to find an existing issue in the backlog - if this is a > duplicate, then please let me know as well.) > Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19133) User provided kafka partitioners are not initialized correctly
[ https://issues.apache.org/jira/browse/FLINK-19133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190358#comment-17190358 ] Kenneth William Krugler commented on FLINK-19133: - Yes, we ran into this exact issue recently with 1.11, where only partition 0 was receiving data. "Fixed" it by passing Optional.empty() for the partition, so it would use Kafka partitioning vs. the FlinkFixedPartitioner, but good to see we weren't imagining things. > User provided kafka partitioners are not initialized correctly > -- > > Key: FLINK-19133 > URL: https://issues.apache.org/jira/browse/FLINK-19133 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.11.1 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Blocker > Fix For: 1.12.0, 1.11.2 > > > Reported in the ML: > https://lists.apache.org/thread.html/r94275a7314d44154eb1ac16237906e0f097e8a9d8a5a937e8dcb5e85%40%3Cdev.flink.apache.org%3E > If a user provides a partitioner in combination with SerializationSchema it > is not initialized correctly and has no access to the parallel instance index > or number of parallel instances. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13319: [FLINK-19022][runtime]Register the TerminationFuture of ResourceManager and Dispatcher with DispatcherResourceManagerComponent
flinkbot edited a comment on pull request #13319: URL: https://github.com/apache/flink/pull/13319#issuecomment-686508323 ## CI report: * 9fab4d5bbccc29d78dee92726cce852421e40541 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6170) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13320: [FLINK-19035] Remove fold from DataStream API
flinkbot edited a comment on pull request #13320: URL: https://github.com/apache/flink/pull/13320#issuecomment-686520258 ## CI report: * a424075cc80ced3f5620b580134393213cc378a5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6172) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
sjwiesman commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483158834 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.util.Collector; + +/** + * Extract from an OperatorState a set of state file paths. + */ +public class StatePathExtractor implements FlatMapFunction { Review comment: Add a serialVersionUID This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
sjwiesman commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483158317 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.util.Collector; + +/** + * Extract from an OperatorState a set of state file paths. Review comment: ```suggestion * Extracts all file paths that are part of the provided {@link OperatorState} ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
sjwiesman commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483157850 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; + +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * A {@link RichMapFunction} for copying files. + */ +public final class FileCopyRichMapFunction extends RichMapFunction { + // the destination path to copy file + private final String path; + + public FileCopyRichMapFunction(String path) { + this.path = path; Review comment: ```suggestion this.path = Preconditions.checkNotNull(path, "The destination path cannot be null"); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
sjwiesman commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483157398 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; + +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * A {@link RichMapFunction} for copying files. + */ Review comment: ```suggestion /** * This mapper copies files from an existing savepoint into a new directory. */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
sjwiesman commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483156521 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; + +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * A {@link RichMapFunction} for copying files. + */ +public final class FileCopyRichMapFunction extends RichMapFunction { Review comment: Add a serialVersionUID This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy
sjwiesman commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483156392 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java ## @@ -90,38 +93,37 @@ public final void write(String path) { List existingOperators = metadata.getExistingOperators(); - DataSet finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators); - - finalOperatorStates - .reduceGroup(new MergeOperatorStates(metadata.getMasterStates())) - .name("reduce(OperatorState)") - .output(new SavepointOutputFormat(savepointPath)) - .name(path); - } - - private DataSet unionOperatorStates(DataSet newOperatorStates, List existingOperators) { DataSet finalOperatorStates; if (existingOperators.isEmpty()) { finalOperatorStates = newOperatorStates; } else { - DataSet wrappedCollection = newOperatorStates - .getExecutionEnvironment() + DataSet existingOperatorStates = newOperatorStates.getExecutionEnvironment() .fromCollection(existingOperators); - finalOperatorStates = newOperatorStates.union(wrappedCollection); + existingOperatorStates + .flatMap(new StatePathExtractor()) Review comment: We need to pin the parallelism of this to `1`. The collection source only runs a parallelism `1` so it will chain and then the paths will redistribute to the copy method. Otherwise we are not properly distributing this work. ```suggestion .flatMap(new StatePathExtractor()) .setParallelism(1) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13305: [FLINK-19109][task] Ignore isLoopRunning in MailboxExecutor.isIdle
flinkbot edited a comment on pull request #13305: URL: https://github.com/apache/flink/pull/13305#issuecomment-685657746 ## CI report: * 380b8d7be6ae142a27061c6d529c2ca059a51aa5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6168) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13316: [FLINK-14422][runtime] Expose network memory usage to TaskManagerDetailsHandler's endpoint
flinkbot edited a comment on pull request #13316: URL: https://github.com/apache/flink/pull/13316#issuecomment-686398901 ## CI report: * e37cb771c66ed8cab48e0b7abd53132fb15dfca3 UNKNOWN * 5ccdc55dc70a57332c50bd59c0a8e6e59a29bffd Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6169) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13128: [FLINK-18795][hbase] Support for HBase 2
flinkbot edited a comment on pull request #13128: URL: https://github.com/apache/flink/pull/13128#issuecomment-672766836 ## CI report: * 313b80f4474455d7013b0852929b5a8458f391a1 UNKNOWN * ece578a9fdbaee4d815de501187d92a729790c9b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6174) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on pull request #13040: [FLINK-17073] [checkpointing] checkpointing backpressure if there are too many checkpoints to clean
rkhachatryan commented on pull request #13040: URL: https://github.com/apache/flink/pull/13040#issuecomment-686639977 Thanks for updating your PR @echauchot There is no cyclic dependency now and `CompletedCheckpointStore`s are not affected! I'm not sure I understood you about having discard logic in `Pending/CompletedCheckpoint` classes. I'd prefer not to put execution (or any) logic there as their concern IMO is to just represent checkpoints. I think something like this should be possible: ``` public class CheckpointsCleaner { public void clean(Runnable cleanAction, Runnable postCleanAction) { counter.incrementAndGet(); executor.execute(() -> { try { cleanAction.run(); } finally { counter.decrementAndGet(); postCleanAction.run(); } }); } } ``` Then in `CheckpointCoordinator`: ``` pendingCheckpoint.finalizeCheckpoint(checkpointsCleaner::clean) // CompletedCheckpoint saves this callback in a field ``` And in `CompletedCheckpoint`: ``` void asyncDiscardCheckpointAndCountCheckpoint(Consumer discardAction) { discardCallbackRunner.accept( () -> discardAction.accept(this), checkpointCleaningFinishedCallback); } ``` WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19135) (Stream)ExecutionEnvironment.execute() should not throw ExecutionException
[ https://issues.apache.org/jira/browse/FLINK-19135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-19135: - Issue Type: Bug (was: Improvement) > (Stream)ExecutionEnvironment.execute() should not throw ExecutionException > -- > > Key: FLINK-19135 > URL: https://issues.apache.org/jira/browse/FLINK-19135 > Project: Flink > Issue Type: Bug > Components: API / DataSet, API / DataStream >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > > In FLINK-14850 we changed the {{execute()}} method to be basically > {code} > final JobClient jobClient = executeAsync(...); > return jobClient.getJobExecutionResult(userClassloader).get(); > {code} > Unfortunately, this means that {{execute()}} now throws an > {{ExecutionException}} instead of a {{ProgramInvocationException}} or > {{JobExecutionException}} as before. The {{ExecutionException}} is wrapping > the other exceptions that we were throwing before. > We didn't notice this in tests because most tests use > {{Test(Stream)Environment}} which overrides the {{execute()}} method and so > doesn't go through the {{PipelineExecutor}} logic or the normal code path of > delegating to {{executeAsync()}}. > We should fix this to go back to the old behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19135) (Stream)ExecutionEnvironment.execute() should not throw ExecutionException
Aljoscha Krettek created FLINK-19135: Summary: (Stream)ExecutionEnvironment.execute() should not throw ExecutionException Key: FLINK-19135 URL: https://issues.apache.org/jira/browse/FLINK-19135 Project: Flink Issue Type: Improvement Components: API / DataSet, API / DataStream Reporter: Aljoscha Krettek In FLINK-14850 we changed the {{execute()}} method to be basically {code} final JobClient jobClient = executeAsync(...); return jobClient.getJobExecutionResult(userClassloader).get(); {code} Unfortunately, this means that {{execute()}} now throws an {{ExecutionException}} instead of a {{ProgramInvocationException}} or {{JobExecutionException}} as before. The {{ExecutionException}} is wrapping the other exceptions that we were throwing before. We didn't notice this in tests because most tests use {{Test(Stream)Environment}} which overrides the {{execute()}} method and so doesn't go through the {{PipelineExecutor}} logic or the normal code path of delegating to {{executeAsync()}}. We should fix this to go back to the old behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-19135) (Stream)ExecutionEnvironment.execute() should not throw ExecutionException
[ https://issues.apache.org/jira/browse/FLINK-19135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-19135: Assignee: Aljoscha Krettek > (Stream)ExecutionEnvironment.execute() should not throw ExecutionException > -- > > Key: FLINK-19135 > URL: https://issues.apache.org/jira/browse/FLINK-19135 > Project: Flink > Issue Type: Improvement > Components: API / DataSet, API / DataStream >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > > In FLINK-14850 we changed the {{execute()}} method to be basically > {code} > final JobClient jobClient = executeAsync(...); > return jobClient.getJobExecutionResult(userClassloader).get(); > {code} > Unfortunately, this means that {{execute()}} now throws an > {{ExecutionException}} instead of a {{ProgramInvocationException}} or > {{JobExecutionException}} as before. The {{ExecutionException}} is wrapping > the other exceptions that we were throwing before. > We didn't notice this in tests because most tests use > {{Test(Stream)Environment}} which overrides the {{execute()}} method and so > doesn't go through the {{PipelineExecutor}} logic or the normal code path of > delegating to {{executeAsync()}}. > We should fix this to go back to the old behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13305: [FLINK-19109][task] Ignore isLoopRunning in MailboxExecutor.isIdle
flinkbot edited a comment on pull request #13305: URL: https://github.com/apache/flink/pull/13305#issuecomment-685657746 ## CI report: * b37d13c99ca3f68f80b26a4f97371d309b124810 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6160) * 380b8d7be6ae142a27061c6d529c2ca059a51aa5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6168) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13316: [FLINK-14422][runtime] Expose network memory usage to TaskManagerDetailsHandler's endpoint
flinkbot edited a comment on pull request #13316: URL: https://github.com/apache/flink/pull/13316#issuecomment-686398901 ## CI report: * e37cb771c66ed8cab48e0b7abd53132fb15dfca3 UNKNOWN * a7f19f060e5ec83090a5265e9be287bd8dbe464e Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6163) * 5ccdc55dc70a57332c50bd59c0a8e6e59a29bffd Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6169) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13217: [FLINK-16866] Make job submission non-blocking
flinkbot edited a comment on pull request #13217: URL: https://github.com/apache/flink/pull/13217#issuecomment-678285884 ## CI report: * 3655fcea1966bfbcb85c86d6a159c354f20d6cc7 UNKNOWN * 8036016c752bce433dc65d1c08695377c917836f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6162) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13227: [FLINK-18959][Runtime] Try to revert MiniDispatcher for archiveExecutionGraph
flinkbot edited a comment on pull request #13227: URL: https://github.com/apache/flink/pull/13227#issuecomment-679046587 ## CI report: * a439269835304a2a64316bcc6779c8997ba4716d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6159) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18815) AbstractCloseableRegistryTest.testClose unstable
[ https://issues.apache.org/jira/browse/FLINK-18815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190276#comment-17190276 ] Kezhu Wang commented on FLINK-18815: Seems that recently two cases are leaking while previous cases are duplicated closing. I think these two cases are caused by {{SafetyNetCloseableRegistry.close}} which interrupt reaper thread. Suppose that: 1. A closeable became phantom reachable and queued in {{CloseableReaperThread.referenceQueue}} but did not get a chance to close. 2. {{SafetyNetCloseableRegistry.close}} calls {{CloseableReaperThread.interrupt}} which set {{CloseableReaperThread.running}} to false and interrupt that java thread. 3. {{CloseableReaperThread}} terminates due to false {{CloseableReaperThread.running}} or {{InterruptedException}}. 4. That enqueued closeable leaks. I think there are two different approaches to fix this issue: * Use at most one {{CloseableReaperThread}}, and don't close it. This may cause leaking if Flink is embedded as guest in other host application. * Count registered phantom references, and close reaper thread only if all registered phantom references are popped and {{CloseableReaperThread}} is dropped by caller. Since Flink is not an end stop application, I think the counting approach maybe more appropriate ? As a analogy, {{java.lang.ref.Cleaner}} has no close-like method, it [tracks all registered referents|https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/jdk/internal/ref/PhantomCleanable.java#L65], its underlying thread will terminate after [itself|https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/jdk/internal/ref/CleanerImpl.java#L101] and [all registered references|https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/jdk/internal/ref/CleanerImpl.java#L133] are cleaned. [~kevin.cyj] [~dian.fu] [~trohrmann] Any thoughts ? > AbstractCloseableRegistryTest.testClose unstable > > > Key: FLINK-18815 > URL: https://issues.apache.org/jira/browse/FLINK-18815 > Project: Flink > Issue Type: Bug > Components: FileSystems, Tests >Affects Versions: 1.10.1, 1.12.0, 1.11.1 >Reporter: Robert Metzger >Assignee: Kezhu Wang >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.10.2, 1.12.0, 1.11.2 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=5164=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0 > {code} > [ERROR] Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.509 > s <<< FAILURE! - in org.apache.flink.core.fs.SafetyNetCloseableRegistryTest > [ERROR] testClose(org.apache.flink.core.fs.SafetyNetCloseableRegistryTest) > Time elapsed: 1.15 s <<< FAILURE! > java.lang.AssertionError: expected:<0> but was:<-1> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > org.apache.flink.core.fs.AbstractCloseableRegistryTest.testClose(AbstractCloseableRegistryTest.java:93) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13321: [FLINK-14870] Remove nullable assumption of task slot sharing group
flinkbot edited a comment on pull request #13321: URL: https://github.com/apache/flink/pull/13321#issuecomment-686567896 ## CI report: * d79f152fa91b8bc555af6fb2a00a8d62b184be5a UNKNOWN * 7039064922aaec22752ac84e4e9d41d663e68a14 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6175) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13284: [FLINK-17016][runtime] Enable pipelined region scheduling
flinkbot edited a comment on pull request #13284: URL: https://github.com/apache/flink/pull/13284#issuecomment-683683000 ## CI report: * a541f5410ff96d39708288ae7aa672ed2ebb05a8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6135) * 71281ac4921c174c214f2393e169e7140698af2d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6177) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13318: [BP-1.11][FLINK-18959][Runtime] Try to revert MiniDispatcher for archiveExecutionGraph and shutdown cluster upon cancel.
flinkbot edited a comment on pull request #13318: URL: https://github.com/apache/flink/pull/13318#issuecomment-686450863 ## CI report: * 6c93ba2f6b0915301a85b181b895a4021eab897b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6161) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13181: [FLINK-18957] Implement logical request bulk tracking in SlotSharingExecutionSlotAllocator
flinkbot edited a comment on pull request #13181: URL: https://github.com/apache/flink/pull/13181#issuecomment-675091412 ## CI report: * e7553689356882de1ffe606400d1255d1d757bc4 UNKNOWN * 18f88af3b438b13e9a240efd2b4979f841d2b978 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5939) * a96b2db52a0db507e0077266c8e9cb947413e1ba Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6176) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tillrohrmann commented on a change in pull request #13319: [FLINK-19022][runtime]Register the TerminationFuture of ResourceManager and Dispatcher with DispatcherResourceManagerCompone
tillrohrmann commented on a change in pull request #13319: URL: https://github.com/apache/flink/pull/13319#discussion_r483078088 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -194,8 +194,9 @@ public Dispatcher( public void onStart() throws Exception { try { startDispatcherServices(); - } catch (Exception e) { - final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e); + } catch (Throwable t) { + getTerminationFuture().completeExceptionally(t); Review comment: This should not be necessary. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ## @@ -513,10 +513,10 @@ public State terminate(AkkaRpcActor akkaRpcActor) { try { terminationFuture = akkaRpcActor.rpcEndpoint.internalCallOnStop(); } catch (Throwable t) { - terminationFuture = FutureUtils.completedExceptionally( - new AkkaRpcException( - String.format("Failure while stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()), - t)); + String errorMsg = String.format("Failure while stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()); + LoggerFactory.getLogger(akkaRpcActor.rpcEndpoint.getClass()).error(errorMsg, t); + terminationFuture = FutureUtils.completedExceptionally(new AkkaRpcException(errorMsg, t)); + akkaRpcActor.rpcEndpoint.getTerminationFuture().completeExceptionally(t); Review comment: This should not be necessary. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java ## @@ -88,6 +95,18 @@ private void registerShutDownFuture() { FutureUtils.forward(dispatcherRunner.getShutDownFuture(), shutDownFuture); + BiConsumer terminateAction = (ignored, throwable) -> { + if (throwable != null) { + shutDownFuture.completeExceptionally(throwable); + } else { + shutDownFuture.complete(ApplicationStatus.SUCCEEDED); + } + if (isRunning.get()) { + fatalErrorHandler.onFatalError(throwable); + } + }; + dispatcherRunner.getTerminationFuture().whenComplete(terminateAction); Review comment: I think the `DispatcherRunner.getTerminationFuture` won't complete if the `Dispatcher` terminates. The problem is that we only forward the result of the `DispatcherLeaderProcess.terminationFuture` to the `DispatcherRunner.terminationFuture` if we call `closeAsync`. I would suggest to add tests to verify the intended behavior. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java ## @@ -107,13 +112,18 @@ public void grantLeadership(UUID leaderSessionID) { } private void startNewDispatcherLeaderProcess(UUID leaderSessionID) { - stopDispatcherLeaderProcess(); + try { + stopDispatcherLeaderProcess(); - dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID); + dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID); - final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess; - FutureUtils.assertNoException( - previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start)); + final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess; + FutureUtils.assertNoException( + previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start)); + } catch (Throwable t) { + terminationFuture.completeExceptionally(t); + throw t; Review comment: Why is this needed? ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java ## @@ -88,6 +95,18 @@ private void registerShutDownFuture() { FutureUtils.forward(dispatcherRunner.getShutDownFuture(), shutDownFuture); +