[jira] [Commented] (FLINK-19133) User provided kafka partitioners are not initialized correctly

2020-09-03 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190573#comment-17190573
 ] 

Danny Chen commented on FLINK-19133:


Hi, [~dwysakowicz], may i take this issue ~

> User provided kafka partitioners are not initialized correctly
> --
>
> Key: FLINK-19133
> URL: https://issues.apache.org/jira/browse/FLINK-19133
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.11.1
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.12.0, 1.11.2
>
>
> Reported in the ML: 
> https://lists.apache.org/thread.html/r94275a7314d44154eb1ac16237906e0f097e8a9d8a5a937e8dcb5e85%40%3Cdev.flink.apache.org%3E
> If a user provides a partitioner in combination with SerializationSchema it 
> is not initialized correctly and has no access to the parallel instance index 
> or number of parallel instances.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] klion26 commented on a change in pull request #13279: [FLINK-19090][docs-zh] Translate "Local Cluster" of "Clusters & Depolyment" page into Chinese

2020-09-03 Thread GitBox


klion26 commented on a change in pull request #13279:
URL: https://github.com/apache/flink/pull/13279#discussion_r483402111



##
File path: docs/ops/deployment/local.zh.md
##
@@ -122,38 +121,38 @@ INFO ... - Recovering all persisted jobs.
 INFO ... - Registering TaskManager ... at ResourceManager
 {% endhighlight %}
 
- Windows Cygwin Users
+ Windows Cygwin 用户
 
-If you are installing Flink from the git repository and you are using the 
Windows git shell, Cygwin can produce a failure similar to this one:
+如果你使用 Windows 的 git shell 从 git 仓库安装 Flink,Cygwin 可能会产生类似如下的错误:
 
 {% highlight bash %}
 c:/flink/bin/start-cluster.sh: line 30: $'\r': command not found
 {% endhighlight %}
 
-This error occurs because git is automatically transforming UNIX line endings 
to Windows style line endings when running in Windows. The problem is that 
Cygwin can only deal with UNIX style line endings. The solution is to adjust 
the Cygwin settings to deal with the correct line endings by following these 
three steps:
+这个错误是因为当 git 运行在 Windows 上时,它会自动地将 UNIX 的行结束符转换成 Windows 风格的行结束符,但是 Cygwin 
只能处理 Unix 风格的行结束符。解决方案是通过以下三步调整 Cygwin 的配置,使其能够正确处理行结束符:
 
-1. Start a Cygwin shell.
+1. 启动 Cygwin shell。
 
-2. Determine your home directory by entering
+2. 输入以下命令进入你的家目录
 
 {% highlight bash %}
 cd; pwd
 {% endhighlight %}
 
-This will return a path under the Cygwin root path.
+这个命令会在 Cygwin 的根目录下返回路径地址。
 
-3. Using NotePad, WordPad or a different text editor open the file 
`.bash_profile` in the home directory and append the following: (If the file 
does not exist you will have to create it)
+3. 使用 NotePad、WordPad 或其他的文本编辑器在家目录下打开 `.bash_profile` 
文件并添加以下内容:(如果这个文件不存在则需要创建)
 
 {% highlight bash %}
 export SHELLOPTS
 set -o igncr
 {% endhighlight %}
 
-Save the file and open a new bash shell.
+保存文件并打开一个新的 bash shell。
 
-### Stop a Local Flink Cluster
+### 关闭 Flink 本地集群

Review comment:
   Personally I prefer to add `` tags for all titles.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix

2020-09-03 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-19108.

Fix Version/s: (was: 1.12.0)
 Assignee: Danny Chen
   Resolution: Fixed

master: fb29aa22e29d62ce0bb58befa969e6eb09af

release-1.11: f2cc139d99e2ac08182c7203f63f83168a821c44

> Stop expanding the identifiers with scope aliased by the system with 'EXPR$' 
> prefix
> ---
>
> Key: FLINK-19108
> URL: https://issues.apache.org/jira/browse/FLINK-19108
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.11.2
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.2
>
>
> For query
> {code:sql}
> create view tmp_view as
> select * from (
>   select f0,
>   row_number() over (partition by f0 order by f0 desc) as rowNum
>   from source) -- the query would be aliased as "EXPR$1"
>   where rowNum = 1
> {code}
> When validation, the inner query would have alias assigned by the system with 
> prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
> all with this prefix which is wrong because we do not add the alias to the 
> inner query anymore.
> To solve the problem, skip the expanding of id with "EXPR$" just like how 
> {{SqlUtil#deriveAliasFromOrdinal}} added it.
> This was introduced by FLINK-18750.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi merged pull request #13293: [FLINK-19108][table] Stop expanding the identifiers with scope aliase…

2020-09-03 Thread GitBox


JingsongLi merged pull request #13293:
URL: https://github.com/apache/flink/pull/13293


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13325: [FLINK-15974][python] Support to use the Python UDF directly in the Python Table API

2020-09-03 Thread GitBox


flinkbot commented on pull request #13325:
URL: https://github.com/apache/flink/pull/13325#issuecomment-686925091


   
   ## CI report:
   
   * 26abf1ecf936d0232c6d49c0bdd4fe3beae6252e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai commented on pull request #142: [FLINK-19130] [core] Expose metrics for backpressure

2020-09-03 Thread GitBox


tzulitai commented on pull request #142:
URL: https://github.com/apache/flink-statefun/pull/142#issuecomment-686920891


   Discussion point:
   
   Strictly speaking, the per-operator `inflight-async-ops` metric is 
redundant, since most metric reporter systems commonly used by Flink 
(Prometheus / InfluxDB etc.) have a query language for aggregating metrics.
   Therefore, the per-operator `inflight-async-ops` metric can be aggregated 
from the per-function ones.
   
   @igalshilman what do you think about this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai opened a new pull request #142: [FLINK-19130] [core] Expose metrics for backpressure

2020-09-03 Thread GitBox


tzulitai opened a new pull request #142:
URL: https://github.com/apache/flink-statefun/pull/142


   This PR adds in total 2 new metrics related to backpressure:
   - Number of blocked addresses (per function type)
   - Number of inflight async operations (per function type + per operator)
   
   We also rejected to add the following metric, since after some discussion it 
doesn't seem to add much value:
   - Number of accumulated records pre-flight in batch per function type.
   This was not added, with the assumption that users would really only want to 
care about that some address has reached the maximum request batch size and was 
being blocked.
   
   ---
   
   ## Verification
   
   I verified this by running the Python Greeter example, with the following 
modifications to let backpressure happen more easily:
   - Maximum batch size = 1
   - No delay between each generated message to have maximum input rate
   
   You can see the following metric charts in the Flink Web UI:
   
   
![image](https://user-images.githubusercontent.com/5284370/92202926-a0f19e00-eeb2-11ea-946d-cbae5bb82260.png)

   ---
   
   ## Brief changelog
   
   - 78cbc19 Extends the `FunctionTypeMetrics` interface to include the new 
metrics, and adds a new `FunctionDispatcherMetrics` interface for per-operator 
metrics.
   - 6408866 Introduce a scoped-down interface `FunctionTypeMetricsRepository` 
and let `StatefulFunctionsRepository` extend it. Components that need to access 
function metrics will be passed this interface.
   - 0fbb17a preliminary extension to `ObjectContainer` DI utility so that we 
can share same instance across different object labels.
   - 6c77ca4 Wire-in the new metrics in `AsyncSink` to expose backpressure 
metrics.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19130) Expose backpressure metrics / logs for function dispatcher operator

2020-09-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19130:
---
Labels: pull-request-available  (was: )

> Expose backpressure metrics / logs for function dispatcher operator
> ---
>
> Key: FLINK-19130
> URL: https://issues.apache.org/jira/browse/FLINK-19130
> Project: Flink
>  Issue Type: Task
>  Components: Stateful Functions
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
>
> As of now, there is no visibility on why or how backpressure is applied in 
> Stateful Functions.
> This JIRA attemps to add two metrics as an initial effort of providing more 
> visibility:
> - Total number of addresses that have asked to be blocked
> - Total number of inflight pending async operations



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #13325: [FLINK-15974][python] Support to use the Python UDF directly in the Python Table API

2020-09-03 Thread GitBox


flinkbot commented on pull request #13325:
URL: https://github.com/apache/flink/pull/13325#issuecomment-686918964


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 26abf1ecf936d0232c6d49c0bdd4fe3beae6252e (Fri Sep 04 
05:34:06 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu opened a new pull request #13325: [FLINK-15974][python] Support to use the Python UDF directly in the Python Table API

2020-09-03 Thread GitBox


dianfu opened a new pull request #13325:
URL: https://github.com/apache/flink/pull/13325


   
   ## What is the purpose of the change
   
   *This pull request add support to use the Python UDF directly in the Python 
Table API. E.g. for Python UDF inc, users could use it directly in the Python 
Table API: tab.select(inc(tab.a)).insert_into("sink")*
   
   
   ## Verifying this change
   
   This change is already covered by the updated existing tests test_udf.py, 
test_pandas_udf.py, etc.*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (Will add documentation in a 
separate PR)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-15974) Support to use the Python UDF directly in the Python Table API

2020-09-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-15974:
---
Labels: pull-request-available  (was: )

> Support to use the Python UDF directly in the Python Table API
> --
>
> Key: FLINK-15974
> URL: https://issues.apache.org/jira/browse/FLINK-15974
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Currently, a Python UDF has been registered before using in Python Table API, 
> e.g.
> {code}
> t_env.register_function("inc", inc)
> table.select("inc(id)") \
>  .insert_into("sink")
> {code}
> It would be great if we could support to use Python UDF directly in the 
> Python Table API, e.g.
> {code}
> table.select(inc("id")) \
>  .insert_into("sink")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15974) Support to use the Python UDF directly in the Python Table API

2020-09-03 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-15974:
---

Assignee: Dian Fu  (was: Shuiqiang Chen)

> Support to use the Python UDF directly in the Python Table API
> --
>
> Key: FLINK-15974
> URL: https://issues.apache.org/jira/browse/FLINK-15974
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.12.0
>
>
> Currently, a Python UDF has been registered before using in Python Table API, 
> e.g.
> {code}
> t_env.register_function("inc", inc)
> table.select("inc(id)") \
>  .insert_into("sink")
> {code}
> It would be great if we could support to use Python UDF directly in the 
> Python Table API, e.g.
> {code}
> table.select(inc("id")) \
>  .insert_into("sink")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] klion26 commented on a change in pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese

2020-09-03 Thread GitBox


klion26 commented on a change in pull request #13225:
URL: https://github.com/apache/flink/pull/13225#discussion_r483392234



##
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/Fetch.java
##
@@ -0,0 +1,13 @@
+package org.apache.flink.table.api;

Review comment:
   please revert these changes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] klion26 commented on a change in pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese

2020-09-03 Thread GitBox


klion26 commented on a change in pull request #13225:
URL: https://github.com/apache/flink/pull/13225#discussion_r483392129



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/SortOperationFactory.java
##
@@ -121,9 +121,9 @@ private SortQueryOperation 
validateAndGetChildSort(QueryOperation child, PostRes
previousSort = (SortQueryOperation) 
createSort(Collections.emptyList(), child, postResolverFactory);

Review comment:
   could you please revert these unrelated changes?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13015: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13015:
URL: https://github.com/apache/flink/pull/13015#issuecomment-665526783


   
   ## CI report:
   
   * 8aac0b56736097238b12499b837ed010fcbf7d15 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6186)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13015: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13015:
URL: https://github.com/apache/flink/pull/13015#issuecomment-665526783


   
   ## CI report:
   
   * 820de7186da99bece7f43b5300781d7b3a2dbe41 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5482)
 
   * 8aac0b56736097238b12499b837ed010fcbf7d15 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6186)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13322: [FLINK-17709][kubernetes][docs] Support running PyFlink on Kubernetes.

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13322:
URL: https://github.com/apache/flink/pull/13322#issuecomment-686859976


   
   ## CI report:
   
   * ef15d07af7eff785478fb42a3cfb67a7463aeb06 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6183)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13015: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13015:
URL: https://github.com/apache/flink/pull/13015#issuecomment-665526783


   
   ## CI report:
   
   * 820de7186da99bece7f43b5300781d7b3a2dbe41 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5482)
 
   * 8aac0b56736097238b12499b837ed010fcbf7d15 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13324: [hotfix][docs] Fix ExecutionEnvironment.scala doc error

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13324:
URL: https://github.com/apache/flink/pull/13324#issuecomment-686875157


   
   ## CI report:
   
   * 04c250f4f16667c122c12e4b793204337d204c59 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6185)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13323: FLINK-19137: Bump Parquet from 1.10.0 to 1.11.1

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13323:
URL: https://github.com/apache/flink/pull/13323#issuecomment-686869045


   
   ## CI report:
   
   * 084ec77b7f7b01dc6314797cb3fa0aa2455432f7 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6184)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13324: [hotfix][docs] Fix ExecutionEnvironment.scala doc error

2020-09-03 Thread GitBox


flinkbot commented on pull request #13324:
URL: https://github.com/apache/flink/pull/13324#issuecomment-686875157


   
   ## CI report:
   
   * 04c250f4f16667c122c12e4b793204337d204c59 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #13306: [FLINK-17779][Connectors/ORC]Orc file format support filter push down

2020-09-03 Thread GitBox


JingsongLi commented on a change in pull request #13306:
URL: https://github.com/apache/flink/pull/13306#discussion_r483360115



##
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
##
@@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig 
options) {
return orcProperties;
}
 
+   private boolean isUnaryValid(CallExpression callExpression) {
+   return callExpression.getChildren().size() == 1 && 
callExpression.getChildren().get(0) instanceof FieldReferenceExpression;
+   }
+
+   private boolean isBinaryValid(CallExpression callExpression) {
+   return callExpression.getChildren().size() == 2 && 
((callExpression.getChildren().get(0) instanceof FieldReferenceExpression && 
callExpression.getChildren().get(1) instanceof ValueLiteralExpression) ||
+   (callExpression.getChildren().get(0) instanceof 
ValueLiteralExpression && callExpression.getChildren().get(1) instanceof 
FieldReferenceExpression));
+   }
+
+   public OrcSplitReader.Predicate toOrcPredicate(Expression expression) {
+   if (expression instanceof CallExpression) {
+   CallExpression callExp = (CallExpression) expression;
+   FunctionDefinition funcDef = 
callExp.getFunctionDefinition();
+
+   if (funcDef == BuiltInFunctionDefinitions.IS_NULL || 
funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL || funcDef == 
BuiltInFunctionDefinitions.NOT) {
+   if (!isUnaryValid(callExp)) {
+   // not a valid predicate
+   LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcFileSystemFormatFactory.", callExp);
+   return null;
+   }
+
+   PredicateLeaf.Type colType = 
toOrcType(((FieldReferenceExpression) 
callExp.getChildren().get(0)).getOutputDataType());
+   if (colType == null) {
+   // unsupported type
+   LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcTableSource.", callExp);
+   return null;
+   }
+
+   String colName = getColumnName(callExp);
+
+   if (funcDef == 
BuiltInFunctionDefinitions.IS_NULL) {
+   return new 
OrcSplitReader.IsNull(colName, colType);
+   } else if (funcDef == 
BuiltInFunctionDefinitions.IS_NOT_NULL) {
+   return new OrcSplitReader.Not(
+   new 
OrcSplitReader.IsNull(colName, colType));
+   } else {
+   OrcSplitReader.Predicate c = 
toOrcPredicate(callExp.getChildren().get(0));
+   if (c == null) {
+   return null;
+   } else {
+   return new 
OrcSplitReader.Not(c);
+   }
+   }
+   } else if (funcDef == BuiltInFunctionDefinitions.OR) {
+   if (callExp.getChildren().size() < 2) {
+   return null;
+   }
+   Expression left = callExp.getChildren().get(0);
+   Expression right = callExp.getChildren().get(1);
+
+   OrcSplitReader.Predicate c1 = 
toOrcPredicate(left);
+   OrcSplitReader.Predicate c2 = 
toOrcPredicate(right);
+   if (c1 == null || c2 == null) {
+   return null;
+   } else {
+   return new OrcSplitReader.Or(c1, c2);
+   }
+   } else {
+   if (!isBinaryValid(callExp)) {
+   // not a valid predicate
+   LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcFileSystemFormatFactory.", callExp);
+   return null;
+   }
+
+   PredicateLeaf.Type litType = 
getLiteralType(callExp);
+   if (litType == null) {
+   // unsupported literal type
+   LOG.debug("Unsupported predicate [{}] 
cannot be pushed into 

[GitHub] [flink] JingsongLi commented on a change in pull request #13306: [FLINK-17779][Connectors/ORC]Orc file format support filter push down

2020-09-03 Thread GitBox


JingsongLi commented on a change in pull request #13306:
URL: https://github.com/apache/flink/pull/13306#discussion_r483358009



##
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
##
@@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig 
options) {
return orcProperties;
}
 
+   private boolean isUnaryValid(CallExpression callExpression) {

Review comment:
   Can you move these codes to a class: `OrcFilters`

##
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
##
@@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig 
options) {
return orcProperties;
}
 
+   private boolean isUnaryValid(CallExpression callExpression) {
+   return callExpression.getChildren().size() == 1 && 
callExpression.getChildren().get(0) instanceof FieldReferenceExpression;
+   }
+
+   private boolean isBinaryValid(CallExpression callExpression) {
+   return callExpression.getChildren().size() == 2 && 
((callExpression.getChildren().get(0) instanceof FieldReferenceExpression && 
callExpression.getChildren().get(1) instanceof ValueLiteralExpression) ||
+   (callExpression.getChildren().get(0) instanceof 
ValueLiteralExpression && callExpression.getChildren().get(1) instanceof 
FieldReferenceExpression));
+   }
+
+   public OrcSplitReader.Predicate toOrcPredicate(Expression expression) {
+   if (expression instanceof CallExpression) {
+   CallExpression callExp = (CallExpression) expression;
+   FunctionDefinition funcDef = 
callExp.getFunctionDefinition();
+
+   if (funcDef == BuiltInFunctionDefinitions.IS_NULL || 
funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL || funcDef == 
BuiltInFunctionDefinitions.NOT) {

Review comment:
   Maybe we can have a `static final ImmutableMap> FILTERS`. The function style can make 
codes better.

##
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
##
@@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig 
options) {
return orcProperties;
}
 
+   private boolean isUnaryValid(CallExpression callExpression) {
+   return callExpression.getChildren().size() == 1 && 
callExpression.getChildren().get(0) instanceof FieldReferenceExpression;
+   }
+
+   private boolean isBinaryValid(CallExpression callExpression) {
+   return callExpression.getChildren().size() == 2 && 
((callExpression.getChildren().get(0) instanceof FieldReferenceExpression && 
callExpression.getChildren().get(1) instanceof ValueLiteralExpression) ||
+   (callExpression.getChildren().get(0) instanceof 
ValueLiteralExpression && callExpression.getChildren().get(1) instanceof 
FieldReferenceExpression));
+   }
+
+   public OrcSplitReader.Predicate toOrcPredicate(Expression expression) {
+   if (expression instanceof CallExpression) {
+   CallExpression callExp = (CallExpression) expression;
+   FunctionDefinition funcDef = 
callExp.getFunctionDefinition();
+
+   if (funcDef == BuiltInFunctionDefinitions.IS_NULL || 
funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL || funcDef == 
BuiltInFunctionDefinitions.NOT) {
+   if (!isUnaryValid(callExp)) {
+   // not a valid predicate
+   LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcFileSystemFormatFactory.", callExp);
+   return null;
+   }
+
+   PredicateLeaf.Type colType = 
toOrcType(((FieldReferenceExpression) 
callExp.getChildren().get(0)).getOutputDataType());
+   if (colType == null) {
+   // unsupported type
+   LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcTableSource.", callExp);
+   return null;
+   }
+
+   String colName = getColumnName(callExp);
+
+   if (funcDef == 
BuiltInFunctionDefinitions.IS_NULL) {
+   return new 
OrcSplitReader.IsNull(colName, colType);
+   } else if (funcDef == 
BuiltInFunctionDefinitions.IS_NOT_NULL) {
+   return new OrcSplitReader.Not(
+   new 
OrcSplitReader.IsNull(colName, colType));
+   } else {
+  

[jira] [Closed] (FLINK-14087) throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using RebalancePartitioner.

2020-09-03 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-14087.
---

> throws java.lang.ArrayIndexOutOfBoundsException  when emiting the data using 
> RebalancePartitioner. 
> ---
>
> Key: FLINK-14087
> URL: https://issues.apache.org/jira/browse/FLINK-14087
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Network
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: luojiangyu
>Assignee: Yun Gao
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-09-16-19-14-39-403.png, 
> image-2019-09-16-19-15-34-639.png
>
>
> There is the condition the RecordWriter sharing the ChannelSelector instance.
> When two RecordWriter instance shared the same ChannelSelector Instance , It 
> may throws 
> java.lang.ArrayIndexOutOfBoundsException .  For example,  two recordWriter 
> instance shared the RebalancePartitioner instance. the RebalancePartitioner 
> instance setup 2 number of Channels when the first RecordWriter initializing, 
> next the some RebalancePartitioner instance setup 3 number of  channels When 
> the second RecordWriter initializing. this  throws 
> ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the 
> data.
> The Exception likes
> |java.lang.RuntimeException: 2 at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>  at 
> com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.ArrayIndexOutOfBoundsException: 2 at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
>  ... 14 more|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14087) throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using RebalancePartitioner.

2020-09-03 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190507#comment-17190507
 ] 

Yun Gao commented on FLINK-14087:
-

fix in master: d7fe9af0b6caa1c76e811240e53434969be771b1

> throws java.lang.ArrayIndexOutOfBoundsException  when emiting the data using 
> RebalancePartitioner. 
> ---
>
> Key: FLINK-14087
> URL: https://issues.apache.org/jira/browse/FLINK-14087
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Network
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: luojiangyu
>Assignee: Yun Gao
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-09-16-19-14-39-403.png, 
> image-2019-09-16-19-15-34-639.png
>
>
> There is the condition the RecordWriter sharing the ChannelSelector instance.
> When two RecordWriter instance shared the same ChannelSelector Instance , It 
> may throws 
> java.lang.ArrayIndexOutOfBoundsException .  For example,  two recordWriter 
> instance shared the RebalancePartitioner instance. the RebalancePartitioner 
> instance setup 2 number of Channels when the first RecordWriter initializing, 
> next the some RebalancePartitioner instance setup 3 number of  channels When 
> the second RecordWriter initializing. this  throws 
> ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the 
> data.
> The Exception likes
> |java.lang.RuntimeException: 2 at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>  at 
> com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.ArrayIndexOutOfBoundsException: 2 at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
>  ... 14 more|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-14087) throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using RebalancePartitioner.

2020-09-03 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao resolved FLINK-14087.
-
Resolution: Fixed

> throws java.lang.ArrayIndexOutOfBoundsException  when emiting the data using 
> RebalancePartitioner. 
> ---
>
> Key: FLINK-14087
> URL: https://issues.apache.org/jira/browse/FLINK-14087
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Network
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: luojiangyu
>Assignee: Yun Gao
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-09-16-19-14-39-403.png, 
> image-2019-09-16-19-15-34-639.png
>
>
> There is the condition the RecordWriter sharing the ChannelSelector instance.
> When two RecordWriter instance shared the same ChannelSelector Instance , It 
> may throws 
> java.lang.ArrayIndexOutOfBoundsException .  For example,  two recordWriter 
> instance shared the RebalancePartitioner instance. the RebalancePartitioner 
> instance setup 2 number of Channels when the first RecordWriter initializing, 
> next the some RebalancePartitioner instance setup 3 number of  channels When 
> the second RecordWriter initializing. this  throws 
> ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the 
> data.
> The Exception likes
> |java.lang.RuntimeException: 2 at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>  at 
> com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.ArrayIndexOutOfBoundsException: 2 at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
>  ... 14 more|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #13324: [hotfix][docs] Fix ExecutionEnvironment.scala doc error

2020-09-03 Thread GitBox


flinkbot commented on pull request #13324:
URL: https://github.com/apache/flink/pull/13324#issuecomment-686872257


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 04c250f4f16667c122c12e4b793204337d204c59 (Fri Sep 04 
03:02:12 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19127) Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment for TableEnvironment

2020-09-03 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190505#comment-17190505
 ] 

Danny Chen commented on FLINK-19127:


Thanks, indeed, notebook like zeppelin may need this ~

> Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment 
> for TableEnvironment
> 
>
> Key: FLINK-19127
> URL: https://issues.apache.org/jira/browse/FLINK-19127
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Timo Walther
>Priority: Major
>
> Connecting to a remote cluster from the unified TableEnvironment is neither 
> tested nor documented. Since StreamExecutionEnvironment is not necessary 
> anymore, users should be able to do the same in TableEnvironment easily. This 
> is in particular useful for interactive sessions that run in an IDE.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] haveanote opened a new pull request #13324: [hotfix][docs] Fix ExecutionEnvironment.scala doc error

2020-09-03 Thread GitBox


haveanote opened a new pull request #13324:
URL: https://github.com/apache/flink/pull/13324


   
   
   ## What is the purpose of the change
   
   *There is a doc error in ExecutionEnvironment.readTextFileWithValue . Change 
utf-0 to utf-8*
   
   
   ## Brief change log
   
 - *Change utf-0 to utf-8*
- *line 236 @param charsetName The name of the character set used to 
read the file. Default is UTF-8*
 
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / 
don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-19138) Python UDF supports directly specifying input_types as DataTypes.ROW

2020-09-03 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-19138:


 Summary: Python UDF supports directly specifying input_types as 
DataTypes.ROW
 Key: FLINK-19138
 URL: https://issues.apache.org/jira/browse/FLINK-19138
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Huang Xingbo
 Fix For: 1.12.0


Python UDF supports input_types=DataTypes.ROW 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #13323: FLINK-19137: Bump Parquet from 1.10.0 to 1.11.1

2020-09-03 Thread GitBox


flinkbot commented on pull request #13323:
URL: https://github.com/apache/flink/pull/13323#issuecomment-686869045


   
   ## CI report:
   
   * 084ec77b7f7b01dc6314797cb3fa0aa2455432f7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13323: FLINK-19137: Bump Parquet from 1.10.0 to 1.11.1

2020-09-03 Thread GitBox


flinkbot commented on pull request #13323:
URL: https://github.com/apache/flink/pull/13323#issuecomment-686867076


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 084ec77b7f7b01dc6314797cb3fa0aa2455432f7 (Fri Sep 04 
02:43:36 UTC 2020)
   
   **Warnings:**
* **1 pom.xml files were touched**: Check for build and licensing issues.
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-19137).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19137) Bump Apache Parquet to 1.11.1

2020-09-03 Thread ABC (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190497#comment-17190497
 ] 

ABC commented on FLINK-19137:
-

[https://github.com/apache/flink/pull/13323]

> Bump Apache Parquet to 1.11.1
> -
>
> Key: FLINK-19137
> URL: https://issues.apache.org/jira/browse/FLINK-19137
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: ABC
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2020-09-04-10-21-00-688.png, 
> image-2020-09-04-10-24-42-480.png
>
>
> Apache Parquet 1.11.1 fixed some important issues:
>  * https://issues.apache.org/jira/browse/PARQUET-1309
>  * https://issues.apache.org/jira/browse/PARQUET-1510
>  * https://issues.apache.org/jira/browse/PARQUET-1485
> Now Flink master branch relies parquet 1.10.0, and flink-sql-parquet artifact 
> shaded parquet class files into flink-sql-parquet.jar. So this may lead to 
> direct memory leak in PARQUET-1485 or parquet properties bug in PARQUET-1309 
> or repeat values with dictionary encoding error in PARQUET-1510.
>  
> For example in PARQUET-1309:
> !image-2020-09-04-10-21-00-688.png!
> then in Flink:
> [https://github.com/C08061/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java#L166]
> !image-2020-09-04-10-24-42-480.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] C08061 opened a new pull request #13323: FLINK-19137: Bump Parquet from 1.10.0 to 1.11.1

2020-09-03 Thread GitBox


C08061 opened a new pull request #13323:
URL: https://github.com/apache/flink/pull/13323


   https://issues.apache.org/jira/browse/FLINK-19137



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19137) Bump Apache Parquet to 1.11.1

2020-09-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19137:
---
Labels: pull-request-available  (was: )

> Bump Apache Parquet to 1.11.1
> -
>
> Key: FLINK-19137
> URL: https://issues.apache.org/jira/browse/FLINK-19137
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: ABC
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2020-09-04-10-21-00-688.png, 
> image-2020-09-04-10-24-42-480.png
>
>
> Apache Parquet 1.11.1 fixed some important issues:
>  * https://issues.apache.org/jira/browse/PARQUET-1309
>  * https://issues.apache.org/jira/browse/PARQUET-1510
>  * https://issues.apache.org/jira/browse/PARQUET-1485
> Now Flink master branch relies parquet 1.10.0, and flink-sql-parquet artifact 
> shaded parquet class files into flink-sql-parquet.jar. So this may lead to 
> direct memory leak in PARQUET-1485 or parquet properties bug in PARQUET-1309 
> or repeat values with dictionary encoding error in PARQUET-1510.
>  
> For example in PARQUET-1309:
> !image-2020-09-04-10-21-00-688.png!
> then in Flink:
> [https://github.com/C08061/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java#L166]
> !image-2020-09-04-10-24-42-480.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13322: [FLINK-17709][kubernetes][docs] Support running PyFlink on Kubernetes.

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13322:
URL: https://github.com/apache/flink/pull/13322#issuecomment-686859976


   
   ## CI report:
   
   * ef15d07af7eff785478fb42a3cfb67a7463aeb06 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6183)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-19137) Bump Apache Parquet to 1.11.1

2020-09-03 Thread ABC (Jira)
ABC created FLINK-19137:
---

 Summary: Bump Apache Parquet to 1.11.1
 Key: FLINK-19137
 URL: https://issues.apache.org/jira/browse/FLINK-19137
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: ABC
 Attachments: image-2020-09-04-10-21-00-688.png, 
image-2020-09-04-10-24-42-480.png

Apache Parquet 1.11.1 fixed some important issues:
 * https://issues.apache.org/jira/browse/PARQUET-1309
 * https://issues.apache.org/jira/browse/PARQUET-1510
 * https://issues.apache.org/jira/browse/PARQUET-1485

Now Flink master branch relies parquet 1.10.0, and flink-sql-parquet artifact 
shaded parquet class files into flink-sql-parquet.jar. So this may lead to 
direct memory leak in PARQUET-1485 or parquet properties bug in PARQUET-1309 or 
repeat values with dictionary encoding error in PARQUET-1510.

 

For example in PARQUET-1309:

!image-2020-09-04-10-21-00-688.png!

then in Flink:

[https://github.com/C08061/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java#L166]

!image-2020-09-04-10-24-42-480.png!

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-statefun] tzulitai closed pull request #141: [FLINK-19129] [k8s] Update log4j-console in template Helm chart

2020-09-03 Thread GitBox


tzulitai closed pull request #141:
URL: https://github.com/apache/flink-statefun/pull/141


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lirui-apache commented on pull request #13315: [FLINK-19070][hive] Hive connector should throw a meaningful exception if user reads/writes ACID tables

2020-09-03 Thread GitBox


lirui-apache commented on pull request #13315:
URL: https://github.com/apache/flink/pull/13315#issuecomment-686860019


   @SteNicholas Thanks for working on this. I think the check should be 
enforced when getting source/sink for a hive table, rather than creating the 
hive table itself.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13322: [FLINK-17709][kubernetes][docs] Support running PyFlink on Kubernetes.

2020-09-03 Thread GitBox


flinkbot commented on pull request #13322:
URL: https://github.com/apache/flink/pull/13322#issuecomment-686859976


   
   ## CI report:
   
   * ef15d07af7eff785478fb42a3cfb67a7463aeb06 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tzulitai commented on a change in pull request #13015: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

2020-09-03 Thread GitBox


tzulitai commented on a change in pull request #13015:
URL: https://github.com/apache/flink/pull/13015#discussion_r483345056



##
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
##
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutRecordPublisherConfiguration {
+
+   /**
+* The efo registration type for de-/registration of streams.
+*/
+   private final EFORegistrationType efoRegistrationType;
+
+   /**
+* The efo stream consumer name. Should not be Null if the 
efoRegistrationType is either LAZY or EAGER.
+*/
+   @Nullable
+   private String consumerName;
+
+   /**
+* The manual set efo consumer arns for each stream. Should not be Null 
if the efoRegistrationType is NONE
+*/
+   @Nullable
+   private Map streamConsumerArns;
+
+   /**
+* Base backoff millis for the deregister stream operation.
+*/
+   private final int subscribeToShardMaxRetries;
+
+   /**
+* Maximum backoff millis for the subscribe to shard operation.
+*/
+   private final long subscribeToShardMaxBackoffMillis;
+
+   /**
+* Base backoff millis for the subscribe to shard operation.
+*/
+   private final long subscribeToShardBaseBackoffMillis;
+
+   /**
+* Exponential backoff power constant for the subscribe to shard 
operation.
+*/
+   private final double subscribeToShardExpConstant;
+
+   /**
+* Base backoff millis for the register stream operation.
+*/
+   private final long registerStreamBaseBackoffMillis;
+
+   /**
+* Maximum backoff millis for the register stream operation.
+*/
+   private final long registerStreamMaxBackoffMillis;
+
+   /**
+* Exponential backoff power constant for the register stream operation.
+*/
+   private final double registerStreamExpConstant;
+
+   /**
+* Maximum retry attempts for the register stream operation.
+*/
+   private final int registerStreamMaxRetries;
+
+   /**
+* Base backoff millis for the deregister stream operation.
+*/
+   private final long deregisterStreamBaseBackoffMillis;
+
+   /**
+* Maximum backoff millis for the deregister stream operation.
+*/
+   private final long deregisterStreamMaxBackoffMillis;
+
+   /**
+* Exponential backoff power constant for the deregister stream 
operation.
+*/
+   private final double deregisterStreamExpConstant;
+
+   /**
+* Maximum retry attempts for the deregister stream operation.
+*/
+   private final int deregisterStreamMaxRetries;
+
+   /**
+* Max retries for the describe stream operation.
+*/
+   private final int describeStreamMaxRetries;
+
+   /**
+* Backoff millis for the describe stream operation.
+*/
+   private final long describeStreamBaseBackoffMillis;
+
+   /**
+* Maximum backoff millis for the describe stream operation.
+*/
+   private final long describeStreamMaxBackoffMillis;
+
+   /**
+* Exponential backoff power constant for the describe stream operation.
+*/
+   private final double 

[GitHub] [flink] flinkbot commented on pull request #13322: [FLINK-17709][kubernetes][docs] Support running PyFlink on Kubernetes.

2020-09-03 Thread GitBox


flinkbot commented on pull request #13322:
URL: https://github.com/apache/flink/pull/13322#issuecomment-686856023


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit ef15d07af7eff785478fb42a3cfb67a7463aeb06 (Fri Sep 04 
02:03:26 UTC 2020)
   
   **Warnings:**
* Documentation files were touched, but no `.zh.md` files: Update Chinese 
documentation or file Jira ticket.
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-17709).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-17709) Active Kubernetes integration phase 3 - Advanced Features

2020-09-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-17709:
---
Labels: pull-request-available  (was: )

> Active Kubernetes integration phase 3 - Advanced Features
> -
>
> Key: FLINK-17709
> URL: https://issues.apache.org/jira/browse/FLINK-17709
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes, Runtime / Coordination
>Reporter: Canbin Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> This is the umbrella issue to track all the advanced features for phase 3 of 
> active Kubernetes integration in Flink 1.12.0. Some of the features are as 
> follows:
> # Support multiple JobManagers in ZooKeeper based HA setups.
> # Support user-specified pod templates.
> # Support FileSystem based high availability.
> # Support running PyFlink.
> # Support accessing secured services via K8s secrets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] shuiqiangchen opened a new pull request #13322: [FLINK-17709][kubernetes][docs] Support running PyFlink on Kubernetes.

2020-09-03 Thread GitBox


shuiqiangchen opened a new pull request #13322:
URL: https://github.com/apache/flink/pull/13322


   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't 
know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-18858) Kinesis Flink SQL Connector

2020-09-03 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190475#comment-17190475
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-18858:
-

[~danny.cranmer]I think a FLIP is not needed for the feature, since the 
addition should be fairly self-contained and does not affect much (if any) of 
the existing codebase.

> Kinesis Flink SQL Connector
> ---
>
> Key: FLINK-18858
> URL: https://issues.apache.org/jira/browse/FLINK-18858
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis, Table SQL / Ecosystem
>Reporter: Waldemar Hummer
>Priority: Major
>
> Hi all,
> as far as I can see in the [list of 
> connectors|https://github.com/apache/flink/tree/master/flink-connectors], we 
> have a 
> {{[flink-connector-kinesis|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis]}}
>  for *programmatic access* to Kinesis streams, but there does not yet seem to 
> exist a *Kinesis SQL connector* (something like 
> {{flink-sql-connector-kinesis}}, analogous to {{flink-sql-connector-kafka}}).
> Our use case would be to enable SQL queries with direct access to Kinesis 
> sources (and potentially sinks), to enable something like the following Flink 
> SQL queries:
> {code:java}
>  $ bin/sql-client.sh embedded
> ...
> Flink SQL> CREATE TABLE Orders(`user` string, amount int, rowtime TIME) WITH 
> ('connector' = 'kinesis', ...);
> ...
> Flink SQL> SELECT * FROM Orders ...;
> ...{code}
>  
> I was wondering if this is something that has been considered, or is already 
> actively being worked on? If one of you can provide some guidance, we may be 
> able to work on a PoC implementation to add this functionality.
>  
> (Wasn't able to find an existing issue in the backlog - if this is a 
> duplicate, then please let me know as well.)
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19136) MetricsAvailabilityITCase.testReporter failed with "Could not satisfy the predicate within the allowed time"

2020-09-03 Thread Dian Fu (Jira)
Dian Fu created FLINK-19136:
---

 Summary: MetricsAvailabilityITCase.testReporter failed with "Could 
not satisfy the predicate within the allowed time"
 Key: FLINK-19136
 URL: https://issues.apache.org/jira/browse/FLINK-19136
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.12.0
Reporter: Dian Fu


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6179=logs=9fca669f-5c5f-59c7-4118-e31c641064f0=91bf6583-3fb2-592f-e4d4-d79d79c3230a]

{code}
2020-09-03T23:33:18.3687261Z [ERROR] 
testReporter(org.apache.flink.metrics.tests.MetricsAvailabilityITCase)  Time 
elapsed: 15.217 s  <<< ERROR!
2020-09-03T23:33:18.3698260Z java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
satisfy the predicate within the allowed time.
2020-09-03T23:33:18.3698749Zat 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
2020-09-03T23:33:18.3699163Zat 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
2020-09-03T23:33:18.3699754Zat 
org.apache.flink.metrics.tests.MetricsAvailabilityITCase.fetchMetric(MetricsAvailabilityITCase.java:162)
2020-09-03T23:33:18.3700234Zat 
org.apache.flink.metrics.tests.MetricsAvailabilityITCase.checkJobManagerMetricAvailability(MetricsAvailabilityITCase.java:116)
2020-09-03T23:33:18.3700726Zat 
org.apache.flink.metrics.tests.MetricsAvailabilityITCase.testReporter(MetricsAvailabilityITCase.java:101)
2020-09-03T23:33:18.3701097Zat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2020-09-03T23:33:18.3701425Zat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2020-09-03T23:33:18.3701798Zat 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2020-09-03T23:33:18.3702146Zat 
java.lang.reflect.Method.invoke(Method.java:498)
2020-09-03T23:33:18.3702471Zat 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
2020-09-03T23:33:18.3702866Zat 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2020-09-03T23:33:18.3703253Zat 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
2020-09-03T23:33:18.3703621Zat 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2020-09-03T23:33:18.3703997Zat 
org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
2020-09-03T23:33:18.3704339Zat 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
2020-09-03T23:33:18.3704629Zat 
org.junit.rules.RunRules.evaluate(RunRules.java:20)
2020-09-03T23:33:18.3704940Zat 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
2020-09-03T23:33:18.3705354Zat 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
2020-09-03T23:33:18.3705725Zat 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
2020-09-03T23:33:18.3706072Zat 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
2020-09-03T23:33:18.3706397Zat 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
2020-09-03T23:33:18.3706714Zat 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
2020-09-03T23:33:18.3707044Zat 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
2020-09-03T23:33:18.3707373Zat 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
2020-09-03T23:33:18.3707708Zat 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
2020-09-03T23:33:18.3708073Zat 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
2020-09-03T23:33:18.3708410Zat 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)
2020-09-03T23:33:18.3708691Zat 
org.junit.runners.Suite.runChild(Suite.java:128)
2020-09-03T23:33:18.3708976Zat 
org.junit.runners.Suite.runChild(Suite.java:27)
2020-09-03T23:33:18.3709273Zat 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
2020-09-03T23:33:18.3709579Zat 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
2020-09-03T23:33:18.3709910Zat 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
2020-09-03T23:33:18.3710242Zat 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
2020-09-03T23:33:18.3710554Zat 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
2020-09-03T23:33:18.3710875Zat 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)
2020-09-03T23:33:18.3711203Zat 
org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
2020-09-03T23:33:18.3711585Zat 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)

[jira] [Updated] (FLINK-19136) MetricsAvailabilityITCase.testReporter failed with "Could not satisfy the predicate within the allowed time"

2020-09-03 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-19136:

Labels: test-stability  (was: )

> MetricsAvailabilityITCase.testReporter failed with "Could not satisfy the 
> predicate within the allowed time"
> 
>
> Key: FLINK-19136
> URL: https://issues.apache.org/jira/browse/FLINK-19136
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6179=logs=9fca669f-5c5f-59c7-4118-e31c641064f0=91bf6583-3fb2-592f-e4d4-d79d79c3230a]
> {code}
> 2020-09-03T23:33:18.3687261Z [ERROR] 
> testReporter(org.apache.flink.metrics.tests.MetricsAvailabilityITCase)  Time 
> elapsed: 15.217 s  <<< ERROR!
> 2020-09-03T23:33:18.3698260Z java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> satisfy the predicate within the allowed time.
> 2020-09-03T23:33:18.3698749Z  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-09-03T23:33:18.3699163Z  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> 2020-09-03T23:33:18.3699754Z  at 
> org.apache.flink.metrics.tests.MetricsAvailabilityITCase.fetchMetric(MetricsAvailabilityITCase.java:162)
> 2020-09-03T23:33:18.3700234Z  at 
> org.apache.flink.metrics.tests.MetricsAvailabilityITCase.checkJobManagerMetricAvailability(MetricsAvailabilityITCase.java:116)
> 2020-09-03T23:33:18.3700726Z  at 
> org.apache.flink.metrics.tests.MetricsAvailabilityITCase.testReporter(MetricsAvailabilityITCase.java:101)
> 2020-09-03T23:33:18.3701097Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-09-03T23:33:18.3701425Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-09-03T23:33:18.3701798Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-09-03T23:33:18.3702146Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-09-03T23:33:18.3702471Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-09-03T23:33:18.3702866Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-09-03T23:33:18.3703253Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-09-03T23:33:18.3703621Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-09-03T23:33:18.3703997Z  at 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-09-03T23:33:18.3704339Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-09-03T23:33:18.3704629Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-09-03T23:33:18.3704940Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-09-03T23:33:18.3705354Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-09-03T23:33:18.3705725Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-09-03T23:33:18.3706072Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-09-03T23:33:18.3706397Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-09-03T23:33:18.3706714Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-09-03T23:33:18.3707044Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-09-03T23:33:18.3707373Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-09-03T23:33:18.3707708Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-09-03T23:33:18.3708073Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2020-09-03T23:33:18.3708410Z  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2020-09-03T23:33:18.3708691Z  at 
> org.junit.runners.Suite.runChild(Suite.java:128)
> 2020-09-03T23:33:18.3708976Z  at 
> org.junit.runners.Suite.runChild(Suite.java:27)
> 2020-09-03T23:33:18.3709273Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-09-03T23:33:18.3709579Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-09-03T23:33:18.3709910Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-09-03T23:33:18.3710242Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-09-03T23:33:18.3710554Z  at 
> 

[jira] [Commented] (FLINK-18815) AbstractCloseableRegistryTest.testClose unstable

2020-09-03 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190472#comment-17190472
 ] 

Dian Fu commented on FLINK-18815:
-

Another instance: 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6180=logs=3b6ec2fd-a816-5e75-c775-06fb87cb6670=b33fdd4f-3de5-542e-2624-5d53167bb672]

> AbstractCloseableRegistryTest.testClose unstable
> 
>
> Key: FLINK-18815
> URL: https://issues.apache.org/jira/browse/FLINK-18815
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Tests
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Robert Metzger
>Assignee: Kezhu Wang
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.2, 1.12.0, 1.11.2
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=5164=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0
> {code}
> [ERROR] Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.509 
> s <<< FAILURE! - in org.apache.flink.core.fs.SafetyNetCloseableRegistryTest
> [ERROR] testClose(org.apache.flink.core.fs.SafetyNetCloseableRegistryTest)  
> Time elapsed: 1.15 s  <<< FAILURE!
> java.lang.AssertionError: expected:<0> but was:<-1>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.flink.core.fs.AbstractCloseableRegistryTest.testClose(AbstractCloseableRegistryTest.java:93)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13309:
URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869


   
   ## CI report:
   
   * 1d254fbbfe4ee9aa6c05bf9fcb44af3317c9274b UNKNOWN
   * 0ac70b7637256b8b4320e509c0649075c7c0aabf Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6181)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19118) Support Expression in the operations of the Python Table API

2020-09-03 Thread sunjincheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-19118.
---
Resolution: Fixed

Fixed in Master: a8cc62a901dabe6c4d877b97db6024715b68174a

> Support Expression in the operations of the Python Table API
> 
>
> Key: FLINK-19118
> URL: https://issues.apache.org/jira/browse/FLINK-19118
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Currently, it only supports string in the operations of the Python Table API. 
> For example:
> {code}
> >>> tab.group_by("key").select("key, value.avg")
> {code}
> After introducing the Expression class in FLINK-19114, it's possible to 
> support
> to use Expression in the operations in the Python Table API, e.g.
> {code}
> >>> tab.group_by(col("key")).select(col("key"), col("value").avg)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] sunjincheng121 merged pull request #13304: [FLINK-19118][python] Support Expression in the operations of Python Table API

2020-09-03 Thread GitBox


sunjincheng121 merged pull request #13304:
URL: https://github.com/apache/flink/pull/13304


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sunjincheng121 commented on pull request #13304: [FLINK-19118][python] Support Expression in the operations of Python Table API

2020-09-03 Thread GitBox


sunjincheng121 commented on pull request #13304:
URL: https://github.com/apache/flink/pull/13304#issuecomment-686839232


   @flinkbot approve all



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19069) finalizeOnMaster takes too much time and client timeouts

2020-09-03 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190447#comment-17190447
 ] 

Kenneth William Krugler commented on FLINK-19069:
-

See also [https://kb.databricks.com/data/append-slow-with-spark-2.0.0.html]

> finalizeOnMaster takes too much time and client timeouts
> 
>
> Key: FLINK-19069
> URL: https://issues.apache.org/jira/browse/FLINK-19069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0
>Reporter: Jiayi Liao
>Priority: Critical
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> Currently we execute {{finalizeOnMaster}} in JM's main thread, which may 
> stuck the JM for a very long time and client timeouts eventually. 
> For example, we'd like to write data to HDFS  and commit files on JM, which 
> takes more than ten minutes to commit tens of thousands files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13309:
URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869


   
   ## CI report:
   
   * 7f8d6066b5e021ff8a343e2c43bd40d256d1a2c9 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6178)
 
   * 1d254fbbfe4ee9aa6c05bf9fcb44af3317c9274b UNKNOWN
   * 0ac70b7637256b8b4320e509c0649075c7c0aabf Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6181)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


sjwiesman commented on pull request #13309:
URL: https://github.com/apache/flink/pull/13309#issuecomment-686819620


   You can apply this patch to SavepointWriterITCase to see the failure. 
   
[SavepointWriterITCase.patch.txt](https://github.com/apache/flink/files/5171892/SavepointWriterITCase.patch.txt)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


sjwiesman commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483299944



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyMapFunction.java
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;

Review comment:
   This package is for user interfaces. This is an internal class and 
should be in the `output` package. Same with the other file. 

##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.Collector;
+
+/**
+ * Extracts all file paths that are part of the provided {@link OperatorState}.
+ */
+public class StatePathExtractor implements FlatMapFunction {

Review comment:
   Mark as @Internal

##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyMapFunction.java
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * This mapper copies files from an existing savepoint into a new directory.
+ */
+public final class FileCopyMapFunction implements MapFunction {

Review comment:
   Internal class with public scope should be marked @Internal 

##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyMapFunction.java
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding 

[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13309:
URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869


   
   ## CI report:
   
   * fd94dc00ac4095094d0a3c84bd066b44ddd346a7 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6150)
 
   * 7f8d6066b5e021ff8a343e2c43bd40d256d1a2c9 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6178)
 
   * 1d254fbbfe4ee9aa6c05bf9fcb44af3317c9274b UNKNOWN
   * 0ac70b7637256b8b4320e509c0649075c7c0aabf Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6181)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13309:
URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869


   
   ## CI report:
   
   * fd94dc00ac4095094d0a3c84bd066b44ddd346a7 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6150)
 
   * 7f8d6066b5e021ff8a343e2c43bd40d256d1a2c9 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6178)
 
   * 1d254fbbfe4ee9aa6c05bf9fcb44af3317c9274b UNKNOWN
   * 0ac70b7637256b8b4320e509c0649075c7c0aabf UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13309:
URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869


   
   ## CI report:
   
   * fd94dc00ac4095094d0a3c84bd066b44ddd346a7 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6150)
 
   * 7f8d6066b5e021ff8a343e2c43bd40d256d1a2c9 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6178)
 
   * 1d254fbbfe4ee9aa6c05bf9fcb44af3317c9274b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13181: [FLINK-18957] Implement logical request bulk tracking in SlotSharingExecutionSlotAllocator

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13181:
URL: https://github.com/apache/flink/pull/13181#issuecomment-675091412


   
   ## CI report:
   
   * e7553689356882de1ffe606400d1255d1d757bc4 UNKNOWN
   * a96b2db52a0db507e0077266c8e9cb947413e1ba Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6176)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


qinjunjerry commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483220192



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * A {@link RichMapFunction} for copying files.
+ */
+public class FileCopyRichMapFunction extends RichMapFunction {
+   // the destination path to copy file
+   private String path;
+
+   public FileCopyRichMapFunction(String path) {
+   this.path = path;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   // create the parent dir only in the first subtask
+   if (getRuntimeContext().getIndexOfThisSubtask() == 0) {

Review comment:
   Changed to `MapFunction`, as we do not guarantee all `open()` are 
executed before all `map()` in all subtasks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


qinjunjerry commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483216756



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
##
@@ -90,38 +93,37 @@ public final void write(String path) {
 
List existingOperators = 
metadata.getExistingOperators();
 
-   DataSet finalOperatorStates = 
unionOperatorStates(newOperatorStates, existingOperators);
-
-   finalOperatorStates
-   .reduceGroup(new 
MergeOperatorStates(metadata.getMasterStates()))
-   .name("reduce(OperatorState)")
-   .output(new SavepointOutputFormat(savepointPath))
-   .name(path);
-   }
-
-   private DataSet 
unionOperatorStates(DataSet newOperatorStates, 
List existingOperators) {
DataSet finalOperatorStates;
if (existingOperators.isEmpty()) {
finalOperatorStates = newOperatorStates;
} else {
-   DataSet wrappedCollection = 
newOperatorStates
-   .getExecutionEnvironment()
+   DataSet existingOperatorStates = 
newOperatorStates.getExecutionEnvironment()
.fromCollection(existingOperators);
 
-   finalOperatorStates = 
newOperatorStates.union(wrappedCollection);
+   existingOperatorStates
+   .flatMap(new StatePathExtractor())

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


qinjunjerry commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483214542



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
##
@@ -90,38 +93,37 @@ public final void write(String path) {
 
List existingOperators = 
metadata.getExistingOperators();
 
-   DataSet finalOperatorStates = 
unionOperatorStates(newOperatorStates, existingOperators);
-
-   finalOperatorStates
-   .reduceGroup(new 
MergeOperatorStates(metadata.getMasterStates()))
-   .name("reduce(OperatorState)")
-   .output(new SavepointOutputFormat(savepointPath))
-   .name(path);
-   }
-
-   private DataSet 
unionOperatorStates(DataSet newOperatorStates, 
List existingOperators) {
DataSet finalOperatorStates;
if (existingOperators.isEmpty()) {
finalOperatorStates = newOperatorStates;
} else {
-   DataSet wrappedCollection = 
newOperatorStates
-   .getExecutionEnvironment()
+   DataSet existingOperatorStates = 
newOperatorStates.getExecutionEnvironment()
.fromCollection(existingOperators);
 
-   finalOperatorStates = 
newOperatorStates.union(wrappedCollection);
+   existingOperatorStates
+   .flatMap(new StatePathExtractor())

Review comment:
   I misunderstood. It is for `.flatMap(new StatePathExtractor())`, not 
file copy.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


qinjunjerry commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483212926



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * A {@link RichMapFunction} for copying files.
+ */
+public final class FileCopyRichMapFunction extends RichMapFunction {
+   // the destination path to copy file
+   private final String path;
+
+   public FileCopyRichMapFunction(String path) {
+   this.path = path;

Review comment:
   Good point!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13284: [FLINK-17016][runtime] Enable pipelined region scheduling

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13284:
URL: https://github.com/apache/flink/pull/13284#issuecomment-683683000


   
   ## CI report:
   
   * 71281ac4921c174c214f2393e169e7140698af2d Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6177)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13309:
URL: https://github.com/apache/flink/pull/13309#issuecomment-685901869


   
   ## CI report:
   
   * fd94dc00ac4095094d0a3c84bd066b44ddd346a7 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6150)
 
   * 7f8d6066b5e021ff8a343e2c43bd40d256d1a2c9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


qinjunjerry commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483212773



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.Collector;
+
+/**
+ * Extract from an OperatorState a set of state file paths.
+ */
+public class StatePathExtractor implements FlatMapFunction {

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


qinjunjerry commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483211627



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * A {@link RichMapFunction} for copying files.
+ */

Review comment:
   Looks better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


qinjunjerry commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483211813



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.Collector;
+
+/**
+ * Extract from an OperatorState a set of state file paths.

Review comment:
   Looks better. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


qinjunjerry commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r48328



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
##
@@ -90,38 +93,37 @@ public final void write(String path) {
 
List existingOperators = 
metadata.getExistingOperators();
 
-   DataSet finalOperatorStates = 
unionOperatorStates(newOperatorStates, existingOperators);
-
-   finalOperatorStates
-   .reduceGroup(new 
MergeOperatorStates(metadata.getMasterStates()))
-   .name("reduce(OperatorState)")
-   .output(new SavepointOutputFormat(savepointPath))
-   .name(path);
-   }
-
-   private DataSet 
unionOperatorStates(DataSet newOperatorStates, 
List existingOperators) {
DataSet finalOperatorStates;
if (existingOperators.isEmpty()) {
finalOperatorStates = newOperatorStates;
} else {
-   DataSet wrappedCollection = 
newOperatorStates
-   .getExecutionEnvironment()
+   DataSet existingOperatorStates = 
newOperatorStates.getExecutionEnvironment()
.fromCollection(existingOperators);
 
-   finalOperatorStates = 
newOperatorStates.union(wrappedCollection);
+   existingOperatorStates
+   .flatMap(new StatePathExtractor())

Review comment:
   If we have a huge savepoint (e.g., in TB range) to process, do we also 
do with parallelism=1?  meaning only one thread does the file copy, potentially 
many of them. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] qinjunjerry commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


qinjunjerry commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483211312



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * A {@link RichMapFunction} for copying files.
+ */
+public final class FileCopyRichMapFunction extends RichMapFunction {

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13321: [FLINK-14870] Remove nullable assumption of task slot sharing group

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13321:
URL: https://github.com/apache/flink/pull/13321#issuecomment-686567896


   
   ## CI report:
   
   * d79f152fa91b8bc555af6fb2a00a8d62b184be5a UNKNOWN
   * 7039064922aaec22752ac84e4e9d41d663e68a14 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6175)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on pull request #13179: [FLINK-18978][state-backends] Support full table scan of key and namespace from statebackend

2020-09-03 Thread GitBox


sjwiesman commented on pull request #13179:
URL: https://github.com/apache/flink/pull/13179#issuecomment-686698188


   @myasuka please take another look when you have the time



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-18858) Kinesis Flink SQL Connector

2020-09-03 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190366#comment-17190366
 ] 

Danny Cranmer commented on FLINK-18858:
---

Thanks [~tzulitai]. Hello [~whummer], I am a member of the Kinesis team at AWS. 
It is great to see interest in SQL support for Kinesis streams. I would be 
happy to collaborate with you on this feature. Have you made any progress on 
your PoC so far? It would be good to arrange a call to discuss the way forward, 
feel free to drop me an email at cranm...@amazon.com. I look forward to hearing 
back!

[~tzulitai]/[~rmetzger] would you expect a FLIP for this, or is the Jira 
sufficient?

> Kinesis Flink SQL Connector
> ---
>
> Key: FLINK-18858
> URL: https://issues.apache.org/jira/browse/FLINK-18858
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis, Table SQL / Ecosystem
>Reporter: Waldemar Hummer
>Priority: Major
>
> Hi all,
> as far as I can see in the [list of 
> connectors|https://github.com/apache/flink/tree/master/flink-connectors], we 
> have a 
> {{[flink-connector-kinesis|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis]}}
>  for *programmatic access* to Kinesis streams, but there does not yet seem to 
> exist a *Kinesis SQL connector* (something like 
> {{flink-sql-connector-kinesis}}, analogous to {{flink-sql-connector-kafka}}).
> Our use case would be to enable SQL queries with direct access to Kinesis 
> sources (and potentially sinks), to enable something like the following Flink 
> SQL queries:
> {code:java}
>  $ bin/sql-client.sh embedded
> ...
> Flink SQL> CREATE TABLE Orders(`user` string, amount int, rowtime TIME) WITH 
> ('connector' = 'kinesis', ...);
> ...
> Flink SQL> SELECT * FROM Orders ...;
> ...{code}
>  
> I was wondering if this is something that has been considered, or is already 
> actively being worked on? If one of you can provide some guidance, we may be 
> able to work on a PoC implementation to add this functionality.
>  
> (Wasn't able to find an existing issue in the backlog - if this is a 
> duplicate, then please let me know as well.)
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19133) User provided kafka partitioners are not initialized correctly

2020-09-03 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190358#comment-17190358
 ] 

Kenneth William Krugler commented on FLINK-19133:
-

Yes, we ran into this exact issue recently with 1.11, where only partition 0 
was receiving data. "Fixed" it by passing Optional.empty() for the partition, 
so it would use Kafka partitioning vs. the FlinkFixedPartitioner, but good to 
see we weren't imagining things.

> User provided kafka partitioners are not initialized correctly
> --
>
> Key: FLINK-19133
> URL: https://issues.apache.org/jira/browse/FLINK-19133
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.11.1
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.12.0, 1.11.2
>
>
> Reported in the ML: 
> https://lists.apache.org/thread.html/r94275a7314d44154eb1ac16237906e0f097e8a9d8a5a937e8dcb5e85%40%3Cdev.flink.apache.org%3E
> If a user provides a partitioner in combination with SerializationSchema it 
> is not initialized correctly and has no access to the parallel instance index 
> or number of parallel instances.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13319: [FLINK-19022][runtime]Register the TerminationFuture of ResourceManager and Dispatcher with DispatcherResourceManagerComponent

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13319:
URL: https://github.com/apache/flink/pull/13319#issuecomment-686508323


   
   ## CI report:
   
   * 9fab4d5bbccc29d78dee92726cce852421e40541 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6170)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13320: [FLINK-19035] Remove fold from DataStream API

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13320:
URL: https://github.com/apache/flink/pull/13320#issuecomment-686520258


   
   ## CI report:
   
   * a424075cc80ced3f5620b580134393213cc378a5 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6172)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


sjwiesman commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483158834



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.Collector;
+
+/**
+ * Extract from an OperatorState a set of state file paths.
+ */
+public class StatePathExtractor implements FlatMapFunction {

Review comment:
   Add a serialVersionUID





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


sjwiesman commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483158317



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.Collector;
+
+/**
+ * Extract from an OperatorState a set of state file paths.

Review comment:
   ```suggestion
* Extracts all file paths that are part of the provided {@link 
OperatorState}
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


sjwiesman commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483157850



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * A {@link RichMapFunction} for copying files.
+ */
+public final class FileCopyRichMapFunction extends RichMapFunction {
+   // the destination path to copy file
+   private final String path;
+
+   public FileCopyRichMapFunction(String path) {
+   this.path = path;

Review comment:
   ```suggestion
this.path = Preconditions.checkNotNull(path, "The destination 
path cannot be null");
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


sjwiesman commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483157398



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * A {@link RichMapFunction} for copying files.
+ */

Review comment:
   ```suggestion
   /**
* This mapper copies files from an existing savepoint into a new directory.
*/
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


sjwiesman commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483156521



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyRichMapFunction.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * A {@link RichMapFunction} for copying files.
+ */
+public final class FileCopyRichMapFunction extends RichMapFunction {

Review comment:
   Add a serialVersionUID





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on a change in pull request #13309: [Flink-14942][state-processor-api] savepoint deep copy

2020-09-03 Thread GitBox


sjwiesman commented on a change in pull request #13309:
URL: https://github.com/apache/flink/pull/13309#discussion_r483156392



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
##
@@ -90,38 +93,37 @@ public final void write(String path) {
 
List existingOperators = 
metadata.getExistingOperators();
 
-   DataSet finalOperatorStates = 
unionOperatorStates(newOperatorStates, existingOperators);
-
-   finalOperatorStates
-   .reduceGroup(new 
MergeOperatorStates(metadata.getMasterStates()))
-   .name("reduce(OperatorState)")
-   .output(new SavepointOutputFormat(savepointPath))
-   .name(path);
-   }
-
-   private DataSet 
unionOperatorStates(DataSet newOperatorStates, 
List existingOperators) {
DataSet finalOperatorStates;
if (existingOperators.isEmpty()) {
finalOperatorStates = newOperatorStates;
} else {
-   DataSet wrappedCollection = 
newOperatorStates
-   .getExecutionEnvironment()
+   DataSet existingOperatorStates = 
newOperatorStates.getExecutionEnvironment()
.fromCollection(existingOperators);
 
-   finalOperatorStates = 
newOperatorStates.union(wrappedCollection);
+   existingOperatorStates
+   .flatMap(new StatePathExtractor())

Review comment:
   We need to pin the parallelism of this to `1`. The collection source 
only runs a parallelism `1` so it will chain and then the paths will 
redistribute to the copy method. Otherwise we are not properly distributing 
this work. 
   
   ```suggestion
.flatMap(new StatePathExtractor())
.setParallelism(1)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13305: [FLINK-19109][task] Ignore isLoopRunning in MailboxExecutor.isIdle

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13305:
URL: https://github.com/apache/flink/pull/13305#issuecomment-685657746


   
   ## CI report:
   
   * 380b8d7be6ae142a27061c6d529c2ca059a51aa5 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6168)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13316: [FLINK-14422][runtime] Expose network memory usage to TaskManagerDetailsHandler's endpoint

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13316:
URL: https://github.com/apache/flink/pull/13316#issuecomment-686398901


   
   ## CI report:
   
   * e37cb771c66ed8cab48e0b7abd53132fb15dfca3 UNKNOWN
   * 5ccdc55dc70a57332c50bd59c0a8e6e59a29bffd Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6169)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13128: [FLINK-18795][hbase] Support for HBase 2

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13128:
URL: https://github.com/apache/flink/pull/13128#issuecomment-672766836


   
   ## CI report:
   
   * 313b80f4474455d7013b0852929b5a8458f391a1 UNKNOWN
   * ece578a9fdbaee4d815de501187d92a729790c9b Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6174)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on pull request #13040: [FLINK-17073] [checkpointing] checkpointing backpressure if there are too many checkpoints to clean

2020-09-03 Thread GitBox


rkhachatryan commented on pull request #13040:
URL: https://github.com/apache/flink/pull/13040#issuecomment-686639977


   Thanks for updating your PR @echauchot 
   There is no cyclic dependency now and `CompletedCheckpointStore`s are not 
affected!
   
   I'm not sure I understood you about having discard logic in 
`Pending/CompletedCheckpoint` classes. I'd prefer not to put execution (or any) 
logic there as their concern IMO is to just represent checkpoints.
   
   I think something like this should be possible:
   ```
   public class CheckpointsCleaner {
   public void clean(Runnable cleanAction, Runnable postCleanAction) {
   counter.incrementAndGet();
   executor.execute(() -> {
   try { cleanAction.run(); }
   finally {
   counter.decrementAndGet();
   postCleanAction.run();
   }
   });
   }
   }
   ```
   Then in `CheckpointCoordinator`:
   ```
   pendingCheckpoint.finalizeCheckpoint(checkpointsCleaner::clean)
   // CompletedCheckpoint saves this callback in a field
   ```
   
   And in `CompletedCheckpoint`:
   ```
   void asyncDiscardCheckpointAndCountCheckpoint(Consumer 
discardAction) {
   discardCallbackRunner.accept(
   () -> discardAction.accept(this),
   checkpointCleaningFinishedCallback);
   }
   ```
   
   WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19135) (Stream)ExecutionEnvironment.execute() should not throw ExecutionException

2020-09-03 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-19135:
-
Issue Type: Bug  (was: Improvement)

> (Stream)ExecutionEnvironment.execute() should not throw ExecutionException
> --
>
> Key: FLINK-19135
> URL: https://issues.apache.org/jira/browse/FLINK-19135
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet, API / DataStream
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>
> In FLINK-14850 we changed the {{execute()}} method to be basically
> {code}
> final JobClient jobClient = executeAsync(...);
> return jobClient.getJobExecutionResult(userClassloader).get();
> {code}
> Unfortunately, this means that {{execute()}} now throws an 
> {{ExecutionException}} instead of a {{ProgramInvocationException}} or 
> {{JobExecutionException}} as before. The {{ExecutionException}} is wrapping 
> the other exceptions that we were throwing before.
> We didn't notice this in tests because most tests use 
> {{Test(Stream)Environment}} which overrides the {{execute()}} method and so 
> doesn't go through the {{PipelineExecutor}} logic or the normal code path of 
> delegating to {{executeAsync()}}.
> We should fix this to go back to the old behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19135) (Stream)ExecutionEnvironment.execute() should not throw ExecutionException

2020-09-03 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-19135:


 Summary: (Stream)ExecutionEnvironment.execute() should not throw 
ExecutionException
 Key: FLINK-19135
 URL: https://issues.apache.org/jira/browse/FLINK-19135
 Project: Flink
  Issue Type: Improvement
  Components: API / DataSet, API / DataStream
Reporter: Aljoscha Krettek


In FLINK-14850 we changed the {{execute()}} method to be basically
{code}
final JobClient jobClient = executeAsync(...);
return jobClient.getJobExecutionResult(userClassloader).get();
{code}

Unfortunately, this means that {{execute()}} now throws an 
{{ExecutionException}} instead of a {{ProgramInvocationException}} or 
{{JobExecutionException}} as before. The {{ExecutionException}} is wrapping the 
other exceptions that we were throwing before.

We didn't notice this in tests because most tests use 
{{Test(Stream)Environment}} which overrides the {{execute()}} method and so 
doesn't go through the {{PipelineExecutor}} logic or the normal code path of 
delegating to {{executeAsync()}}.

We should fix this to go back to the old behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-19135) (Stream)ExecutionEnvironment.execute() should not throw ExecutionException

2020-09-03 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-19135:


Assignee: Aljoscha Krettek

> (Stream)ExecutionEnvironment.execute() should not throw ExecutionException
> --
>
> Key: FLINK-19135
> URL: https://issues.apache.org/jira/browse/FLINK-19135
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet, API / DataStream
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>
> In FLINK-14850 we changed the {{execute()}} method to be basically
> {code}
> final JobClient jobClient = executeAsync(...);
> return jobClient.getJobExecutionResult(userClassloader).get();
> {code}
> Unfortunately, this means that {{execute()}} now throws an 
> {{ExecutionException}} instead of a {{ProgramInvocationException}} or 
> {{JobExecutionException}} as before. The {{ExecutionException}} is wrapping 
> the other exceptions that we were throwing before.
> We didn't notice this in tests because most tests use 
> {{Test(Stream)Environment}} which overrides the {{execute()}} method and so 
> doesn't go through the {{PipelineExecutor}} logic or the normal code path of 
> delegating to {{executeAsync()}}.
> We should fix this to go back to the old behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13305: [FLINK-19109][task] Ignore isLoopRunning in MailboxExecutor.isIdle

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13305:
URL: https://github.com/apache/flink/pull/13305#issuecomment-685657746


   
   ## CI report:
   
   * b37d13c99ca3f68f80b26a4f97371d309b124810 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6160)
 
   * 380b8d7be6ae142a27061c6d529c2ca059a51aa5 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6168)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13316: [FLINK-14422][runtime] Expose network memory usage to TaskManagerDetailsHandler's endpoint

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13316:
URL: https://github.com/apache/flink/pull/13316#issuecomment-686398901


   
   ## CI report:
   
   * e37cb771c66ed8cab48e0b7abd53132fb15dfca3 UNKNOWN
   * a7f19f060e5ec83090a5265e9be287bd8dbe464e Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6163)
 
   * 5ccdc55dc70a57332c50bd59c0a8e6e59a29bffd Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6169)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13217: [FLINK-16866] Make job submission non-blocking

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13217:
URL: https://github.com/apache/flink/pull/13217#issuecomment-678285884


   
   ## CI report:
   
   * 3655fcea1966bfbcb85c86d6a159c354f20d6cc7 UNKNOWN
   * 8036016c752bce433dc65d1c08695377c917836f Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6162)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13227: [FLINK-18959][Runtime] Try to revert MiniDispatcher for archiveExecutionGraph

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13227:
URL: https://github.com/apache/flink/pull/13227#issuecomment-679046587


   
   ## CI report:
   
   * a439269835304a2a64316bcc6779c8997ba4716d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6159)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-18815) AbstractCloseableRegistryTest.testClose unstable

2020-09-03 Thread Kezhu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190276#comment-17190276
 ] 

Kezhu Wang commented on FLINK-18815:


Seems that recently two cases are leaking while previous cases are duplicated 
closing. I think these two cases are caused by 
{{SafetyNetCloseableRegistry.close}} which interrupt reaper thread. Suppose 
that:
 1. A closeable became phantom reachable and queued in 
{{CloseableReaperThread.referenceQueue}} but did not get a chance to close.
 2. {{SafetyNetCloseableRegistry.close}} calls 
{{CloseableReaperThread.interrupt}} which set {{CloseableReaperThread.running}} 
to false and interrupt that java thread.
 3. {{CloseableReaperThread}} terminates due to false 
{{CloseableReaperThread.running}} or {{InterruptedException}}.
 4. That enqueued closeable leaks.

I think there are two different approaches to fix this issue:
 * Use at most one {{CloseableReaperThread}}, and don't close it. This may 
cause leaking if Flink is embedded as guest in other host application.
 * Count registered phantom references, and close reaper thread only if all 
registered phantom references are popped and {{CloseableReaperThread}} is 
dropped by caller.

Since Flink is not an end stop application, I think the counting approach maybe 
more appropriate ?

As a analogy, {{java.lang.ref.Cleaner}} has no close-like method, it [tracks 
all registered 
referents|https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/jdk/internal/ref/PhantomCleanable.java#L65],
 its underlying thread will terminate after 
[itself|https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/jdk/internal/ref/CleanerImpl.java#L101]
 and [all registered 
references|https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/jdk/internal/ref/CleanerImpl.java#L133]
 are cleaned.

[~kevin.cyj] [~dian.fu] [~trohrmann] Any thoughts ?

> AbstractCloseableRegistryTest.testClose unstable
> 
>
> Key: FLINK-18815
> URL: https://issues.apache.org/jira/browse/FLINK-18815
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Tests
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Robert Metzger
>Assignee: Kezhu Wang
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.2, 1.12.0, 1.11.2
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=5164=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0
> {code}
> [ERROR] Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.509 
> s <<< FAILURE! - in org.apache.flink.core.fs.SafetyNetCloseableRegistryTest
> [ERROR] testClose(org.apache.flink.core.fs.SafetyNetCloseableRegistryTest)  
> Time elapsed: 1.15 s  <<< FAILURE!
> java.lang.AssertionError: expected:<0> but was:<-1>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.flink.core.fs.AbstractCloseableRegistryTest.testClose(AbstractCloseableRegistryTest.java:93)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13321: [FLINK-14870] Remove nullable assumption of task slot sharing group

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13321:
URL: https://github.com/apache/flink/pull/13321#issuecomment-686567896


   
   ## CI report:
   
   * d79f152fa91b8bc555af6fb2a00a8d62b184be5a UNKNOWN
   * 7039064922aaec22752ac84e4e9d41d663e68a14 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6175)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13284: [FLINK-17016][runtime] Enable pipelined region scheduling

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13284:
URL: https://github.com/apache/flink/pull/13284#issuecomment-683683000


   
   ## CI report:
   
   * a541f5410ff96d39708288ae7aa672ed2ebb05a8 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6135)
 
   * 71281ac4921c174c214f2393e169e7140698af2d Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6177)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13318: [BP-1.11][FLINK-18959][Runtime] Try to revert MiniDispatcher for archiveExecutionGraph and shutdown cluster upon cancel.

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13318:
URL: https://github.com/apache/flink/pull/13318#issuecomment-686450863


   
   ## CI report:
   
   * 6c93ba2f6b0915301a85b181b895a4021eab897b Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6161)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13181: [FLINK-18957] Implement logical request bulk tracking in SlotSharingExecutionSlotAllocator

2020-09-03 Thread GitBox


flinkbot edited a comment on pull request #13181:
URL: https://github.com/apache/flink/pull/13181#issuecomment-675091412


   
   ## CI report:
   
   * e7553689356882de1ffe606400d1255d1d757bc4 UNKNOWN
   * 18f88af3b438b13e9a240efd2b4979f841d2b978 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5939)
 
   * a96b2db52a0db507e0077266c8e9cb947413e1ba Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6176)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #13319: [FLINK-19022][runtime]Register the TerminationFuture of ResourceManager and Dispatcher with DispatcherResourceManagerCompone

2020-09-03 Thread GitBox


tillrohrmann commented on a change in pull request #13319:
URL: https://github.com/apache/flink/pull/13319#discussion_r483078088



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##
@@ -194,8 +194,9 @@ public Dispatcher(
public void onStart() throws Exception {
try {
startDispatcherServices();
-   } catch (Exception e) {
-   final DispatcherException exception = new 
DispatcherException(String.format("Could not start the Dispatcher %s", 
getAddress()), e);
+   } catch (Throwable t) {
+   getTerminationFuture().completeExceptionally(t);

Review comment:
   This should not be necessary.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
##
@@ -513,10 +513,10 @@ public State terminate(AkkaRpcActor akkaRpcActor) {
try {
terminationFuture = 
akkaRpcActor.rpcEndpoint.internalCallOnStop();
} catch (Throwable t) {
-   terminationFuture = 
FutureUtils.completedExceptionally(
-   new AkkaRpcException(
-   String.format("Failure while 
stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
-   t));
+   String errorMsg = String.format("Failure while 
stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId());
+   
LoggerFactory.getLogger(akkaRpcActor.rpcEndpoint.getClass()).error(errorMsg, t);
+   terminationFuture = 
FutureUtils.completedExceptionally(new AkkaRpcException(errorMsg, t));
+   
akkaRpcActor.rpcEndpoint.getTerminationFuture().completeExceptionally(t);

Review comment:
   This should not be necessary.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##
@@ -88,6 +95,18 @@
 
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
+   BiConsumer terminateAction = (ignored, 
throwable) -> {
+   if (throwable != null) {
+   shutDownFuture.completeExceptionally(throwable);
+   } else {
+   
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+   }
+   if (isRunning.get()) {
+   fatalErrorHandler.onFatalError(throwable);
+   }
+   };
+   
dispatcherRunner.getTerminationFuture().whenComplete(terminateAction);

Review comment:
   I think the `DispatcherRunner.getTerminationFuture` won't complete if 
the `Dispatcher` terminates. The problem is that we only forward the result of 
the `DispatcherLeaderProcess.terminationFuture` to the 
`DispatcherRunner.terminationFuture` if we call `closeAsync`. I would suggest 
to add tests to verify the intended behavior.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
##
@@ -107,13 +112,18 @@ public void grantLeadership(UUID leaderSessionID) {
}
 
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
-   stopDispatcherLeaderProcess();
+   try {
+   stopDispatcherLeaderProcess();
 
-   dispatcherLeaderProcess = 
createNewDispatcherLeaderProcess(leaderSessionID);
+   dispatcherLeaderProcess = 
createNewDispatcherLeaderProcess(leaderSessionID);
 
-   final DispatcherLeaderProcess newDispatcherLeaderProcess = 
dispatcherLeaderProcess;
-   FutureUtils.assertNoException(
-   
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
+   final DispatcherLeaderProcess 
newDispatcherLeaderProcess = dispatcherLeaderProcess;
+   FutureUtils.assertNoException(
+   
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
+   } catch (Throwable t) {
+   terminationFuture.completeExceptionally(t);
+   throw t;

Review comment:
   Why is this needed?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##
@@ -88,6 +95,18 @@
 
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
+

  1   2   3   4   >