[GitHub] [flink] flinkbot commented on pull request #13459: [FLINK-18199] [chinese-translation]: translate FileSystem SQL Connector page into Chinese

2020-09-22 Thread GitBox


flinkbot commented on pull request #13459:
URL: https://github.com/apache/flink/pull/13459#issuecomment-697149813


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit a3e5590fcd9193b1699c4539cf93aa175b14d829 (Wed Sep 23 
05:54:42 UTC 2020)
   
✅no warnings
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11359: [FLINK-16095] [docs-zh] Translate "Modules" page of "Table API & SQL" into Chinese

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #11359:
URL: https://github.com/apache/flink/pull/11359#issuecomment-596916485


   
   ## CI report:
   
   * a42e8d680a2a0d344a0ffda29c2891627f7d1e2d Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/170635599) Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3123)
 
   * a6f603321cbeca978ad64118ce9379ff1cb57316 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-18199) Translate "Filesystem SQL Connector" page into Chinese

2020-09-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-18199:
---
Labels: pull-request-available  (was: )

> Translate "Filesystem SQL Connector" page into Chinese
> --
>
> Key: FLINK-18199
> URL: https://issues.apache.org/jira/browse/FLINK-18199
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Connectors / FileSystem, 
> Documentation, Table SQL / Ecosystem
>Reporter: Jark Wu
>Assignee: michaelli
>Priority: Major
>  Labels: pull-request-available
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connectors/filesystem.html
> The markdown file is located in 
> flink/docs/dev/table/connectors/filesystem.zh.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] michaelli916 opened a new pull request #13459: [FLINK-18199] [chinese-translation]: translate FileSystem SQL Connector page into Chinese

2020-09-22 Thread GitBox


michaelli916 opened a new pull request #13459:
URL: https://github.com/apache/flink/pull/13459


   
   
   ## What is the purpose of the change
   
   *Translate "Filesystem SQL Connector" page into Chinese*
   
   
   ## Brief change log
   
   *Translate "Filesystem SQL Connector" page into Chinese*
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive):  no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector:  no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on pull request #13033: [FLINK-18777][catalog] Supports schema registry catalog

2020-09-22 Thread GitBox


danny0405 commented on pull request #13033:
URL: https://github.com/apache/flink/pull/13033#issuecomment-697103132


   @maver1ck The code seems straight-forward, @dawidwys said that he would like 
to review this PR, but i'm not sure if he has time.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13451: [FLINK-19333][python] Introduce BatchArrowPythonOverWindowAggregateFunctionOperator

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13451:
URL: https://github.com/apache/flink/pull/13451#issuecomment-696565311


   
   ## CI report:
   
   * e682b70e2b68e60ed3950290b1ebe53a6ecd40d8 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6757)
 
   * cb9329281ad618abbd1c75a1922e038a0869944e Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6811)
 
   * fc7702fc7d994c1fc449b36c365ade2f43b3e7e6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6819)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong closed pull request #9356: [FLINK-13340][kafka][table] Add 'topics' and 'subscriptionPattern' option for Flink Kafka connector

2020-09-22 Thread GitBox


wuchong closed pull request #9356:
URL: https://github.com/apache/flink/pull/9356


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] HuangXingBo commented on pull request #13451: [FLINK-19333][python] Introduce BatchArrowPythonOverWindowAggregateFunctionOperator

2020-09-22 Thread GitBox


HuangXingBo commented on pull request #13451:
URL: https://github.com/apache/flink/pull/13451#issuecomment-697073756


   @dianfu Thanks a lot for the update. I have addressed the comments at the 
latest commit.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lirui-apache commented on pull request #13455: [FLINK-19361][hive] Create a synchronized metastore client to talk to…

2020-09-22 Thread GitBox


lirui-apache commented on pull request #13455:
URL: https://github.com/apache/flink/pull/13455#issuecomment-697115923


   @JingsongLi Thanks for the review. I have verified tests with Hive-1.0.1



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #10354: [FLINK-14729][connectors] Multi-topics consuming from KafkaTableSource

2020-09-22 Thread GitBox


wuchong commented on pull request #10354:
URL: https://github.com/apache/flink/pull/10354#issuecomment-697100110


   Sorry for late replying and not noticing this PR. The multi-topic (topic 
list and topic pattern) for SQL connector has been supported in FLINK-18449. 
   
   How about to close this PR and FLINK-14729?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] leonardBang commented on pull request #13289: [FLINK-18548][table-planner] support flexible syntax for Temporal table join

2020-09-22 Thread GitBox


leonardBang commented on pull request #13289:
URL: https://github.com/apache/flink/pull/13289#issuecomment-697081273


   > The compile is failed, please fix it first.
   > 
   > ```
   > FlinkPlannerImpl.scala message=File line length exceeds 100 characters 
line=168
   > ```
   
   I've fixed and rebased 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Shawn-Hx commented on pull request #13410: [FLINK-19247][docs-zh] Update Chinese documentation after removal of Kafka 0.10 and 0.11

2020-09-22 Thread GitBox


Shawn-Hx commented on pull request #13410:
URL: https://github.com/apache/flink/pull/13410#issuecomment-697080083


   Hi, @klion26 
   This PR is ready to be reviewed. Could you help to review it at your 
convenience?
   Thanks~



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wsry commented on a change in pull request #13447: [FLINK-19297][network] Make ResultPartitionWriter record-oriented

2020-09-22 Thread GitBox


wsry commented on a change in pull request #13447:
URL: https://github.com/apache/flink/pull/13447#discussion_r493159836



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
##
@@ -63,6 +63,22 @@ public BoundedBlockingResultPartition(
bufferPoolFactory);
}
 
+   @Override
+   public void flush(int targetSubpartition) {
+   finishBroadcastBufferBuilder();

Review comment:
   This logic is already in BoundedBlockingResultPartition and should has 
no impact on streaming/pipelined cases. I guess the source file name is misread?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
##
@@ -63,6 +63,22 @@ public BoundedBlockingResultPartition(
bufferPoolFactory);
}
 
+   @Override
+   public void flush(int targetSubpartition) {
+   finishBroadcastBufferBuilder();

Review comment:
   This logic is already in BoundedBlockingResultPartition and should have 
no impact on streaming/pipelined cases. I guess the source file name is misread?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-playgrounds] shuiqiangchen commented on pull request #16: [FLINK-19145][walkthroughs] Add PyFlink-walkthrough to Flink playground.

2020-09-22 Thread GitBox


shuiqiangchen commented on pull request #16:
URL: https://github.com/apache/flink-playgrounds/pull/16#issuecomment-697071304


   @alpinegizmo Thank you for polishing the doc, I have updated the PR, please 
have a look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11896: [FLINK-14356] [single-value] Introduce "single-value" format to (de)serialize message to a single field

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #11896:
URL: https://github.com/apache/flink/pull/11896#issuecomment-618808279







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong merged pull request #13289: [FLINK-18548][table-planner] support flexible syntax for Temporal table join

2020-09-22 Thread GitBox


wuchong merged pull request #13289:
URL: https://github.com/apache/flink/pull/13289


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] galenwarren commented on pull request #152: [FLINK-19176] Add pluggable statefun payload serializer

2020-09-22 Thread GitBox


galenwarren commented on pull request #152:
URL: https://github.com/apache/flink-statefun/pull/152#issuecomment-696388005


   @igalshilman Thanks!
   
   I'll take a look at the others shortly, but wanted to quickly reply 
regarding the question about Optional and the serialized snapshot 
format. (I can't reply to the comment itself for some reason ...) What you 
propose is absolutely doable. I just wanted to make sure you're aware that the 
suggested code is essentially identical to the implementation of 
StringUtils.writeNullableString, which is what is currently used to write the 
version 2 snapshot in MessageTypeSerializer.Snapshot.writeSnapshot, i.e.:
   
   ```
   @Override
   public void writeSnapshot(DataOutputView dataOutputView) throws 
IOException {
   
 // version 1
 dataOutputView.writeUTF(messageFactoryKey.getType().name());
   
 // added in version 2
 StringUtils.writeNullableString(
 messageFactoryKey.getCustomPayloadSerializerClassName(), 
dataOutputView);
   }
   ```
   ... and ...
   
   ```
   public static void writeNullableString(@Nullable String str, 
DataOutputView out) throws IOException {
   if (str != null) {
   out.writeBoolean(true);
   writeString(str, out);
   } else {
   out.writeBoolean(false);
   }
   }
   ```
   
   So the serialized format would already seem to be what you've requested! But 
if you'd prefer it get written without using 
```StringUtils.writeNullableString```, I'm happy to make that change.
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13447: [FLINK-19297][network] Make ResultPartitionWriter record-oriented

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13447:
URL: https://github.com/apache/flink/pull/13447#issuecomment-696488834







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi merged pull request #13455: [FLINK-19361][hive] Create a synchronized metastore client to talk to…

2020-09-22 Thread GitBox


JingsongLi merged pull request #13455:
URL: https://github.com/apache/flink/pull/13455


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi merged pull request #13414: [FLINK-19227][Table SQL / API] The catalog is still created after opening failed in catalog registering

2020-09-22 Thread GitBox


JingsongLi merged pull request #13414:
URL: https://github.com/apache/flink/pull/13414


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #9356: [FLINK-13340][kafka][table] Add 'topics' and 'subscriptionPattern' option for Flink Kafka connector

2020-09-22 Thread GitBox


wuchong commented on pull request #9356:
URL: https://github.com/apache/flink/pull/9356#issuecomment-697100549


   Close this as it has been supported in FLINK-18449.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13419: [FLINK-18842][e2e] Added 10min timeout to building the container.

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13419:
URL: https://github.com/apache/flink/pull/13419#issuecomment-694745717







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13406: [FLINK-19243][elasticsearch] Bump snakeyaml to 1.27

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13406:
URL: https://github.com/apache/flink/pull/13406#issuecomment-694116951







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)

2020-09-22 Thread GitBox


xintongsong commented on pull request #13453:
URL: https://github.com/apache/flink/pull/13453#issuecomment-696625955







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #13419: [FLINK-18842][e2e] Added 10min timeout to building the container.

2020-09-22 Thread GitBox


zentol commented on a change in pull request #13419:
URL: https://github.com/apache/flink/pull/13419#discussion_r492588052



##
File path: flink-end-to-end-tests/test-scripts/common.sh
##
@@ -830,3 +830,30 @@ run_test_with_timeout() {
   $TEST_COMMAND
   )
 }
+
+run_with_timeout() {
+  local timeout="$1"
+  shift
+  local pidFile="$1"
+  shift
+  local command="$@"
+
+  # invoke command
+  (eval "$command"; rm $pidFile; ) &
+  pid=$!
+  echo $pid > $pidFile
+
+  # invoke timeout guard
+  (
+sleep $timeout
+if [[ -e $pidFile ]]; then

Review comment:
   We should be able to re-use `run_test_with_timeout`. All we basically 
need to do is move this
   ``` 
   echo "Printing Flink logs and killing it:"
   cat ${FLINK_DIR}/log/*
   ```
   into a separate function, and add a new `run_with_timeout` that accepts a 
timeout, run-command, and on-failure-command.
   
   ```
   run_test_with_timeout() {
 local TEST_TIMEOUT_SECONDS=$1
 shift
 local TEST_COMMAND=$@
 internal_run_with_timeout $TEST_TIMEOUT_SECONDS $TEST_COMMAND print_logs
   }
   
   run_with_timeout() {
 internal_run_with_timeout $1 $2 ""
   }
   
   internal_run_with_timeout() {
 local timeout=$1
 local runCommand=$2
 local onFailureCommand=$3
   
 // do stuff
   }
   
   print_logs() {
 echo "Printing Flink logs and killing it:"
  cat ${FLINK_DIR}/log/*
   }
   ```

##
File path: flink-end-to-end-tests/test-scripts/common.sh
##
@@ -830,3 +830,30 @@ run_test_with_timeout() {
   $TEST_COMMAND
   )
 }
+
+run_with_timeout() {
+  local timeout="$1"
+  shift
+  local pidFile="$1"
+  shift
+  local command="$@"
+
+  # invoke command
+  (eval "$command"; rm $pidFile; ) &
+  pid=$!
+  echo $pid > $pidFile
+
+  # invoke timeout guard
+  (
+sleep $timeout
+if [[ -e $pidFile ]]; then

Review comment:
   We should be able to re-use `run_test_with_timeout`. All we basically 
need to do is move this
   ``` 
   echo "Printing Flink logs and killing it:"
   cat ${FLINK_DIR}/log/*
   ```
   into a separate function, and add a new `run_with_timeout` that accepts a 
timeout, run-command, and on-failure-command.
   
   ```
   run_test_with_timeout() {
 local TEST_TIMEOUT_SECONDS=$1
 shift
 local TEST_COMMAND=$@
 internal_run_with_timeout $TEST_TIMEOUT_SECONDS "$TEST_COMMAND" print_logs
   }
   
   run_with_timeout() {
 internal_run_with_timeout $1 "$2" ""
   }
   
   internal_run_with_timeout() {
 local timeout=$1
 local runCommand=$2
 local onFailureCommand=$3
   
 // do stuff
   }
   
   print_logs() {
 echo "Printing Flink logs and killing it:"
  cat ${FLINK_DIR}/log/*
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai closed pull request #159: [FLINK-19330][core] Move intialization logic to open() instead initializeState

2020-09-22 Thread GitBox


tzulitai closed pull request #159:
URL: https://github.com/apache/flink-statefun/pull/159


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai closed pull request #157: [FLINK-19327][k8s] Bump JobManager heap size to 1 GB

2020-09-22 Thread GitBox


tzulitai closed pull request #157:
URL: https://github.com/apache/flink-statefun/pull/157


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13228:
URL: https://github.com/apache/flink/pull/13228#issuecomment-679099456







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13420: [FLINK-19229][python] Introduce the PythonStreamGroupAggregateOperator for Python UDAF.

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13420:
URL: https://github.com/apache/flink/pull/13420#issuecomment-694793326







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong edited a comment on pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)

2020-09-22 Thread GitBox


xintongsong edited a comment on pull request #13453:
URL: https://github.com/apache/flink/pull/13453#issuecomment-696625955


   Chime in with a minor comment:
   > The Serial Version UID for new classes should start at 1.
   
   Suggested by the code style guidelines.
   
https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dawidwys edited a comment on pull request #13450: [FLINK-19339] Support unions with logical types in Avro >= 1.9.x

2020-09-22 Thread GitBox


dawidwys edited a comment on pull request #13450:
URL: https://github.com/apache/flink/pull/13450#issuecomment-696655915


   It is relevant for `AvroSerializer` (which I updated through 
`AvroFactory#fromSpecific` and added a test for it: 
`AvroUnionLogicalSerializerTest.java`).
   
   I checked all places where we instantiate the `SpecificData`manually and 
updated those.
   
   In the `AvroInputFormat` we use the `SpecificDatumReader` which should just 
work (it has that logic of acquiring `SpecificData` built in).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13456: Single task add partial flag in buffer

2020-09-22 Thread GitBox


flinkbot commented on pull request #13456:
URL: https://github.com/apache/flink/pull/13456#issuecomment-696739793







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kl0u commented on a change in pull request #13423: [FLINK-19269] Make the PushingAsyncDataInput.DataOutput aware of endOfInput

2020-09-22 Thread GitBox


kl0u commented on a change in pull request #13423:
URL: https://github.com/apache/flink/pull/13423#discussion_r492589663



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker 
latencyMarker) throws Exception {
public void setKeyContextElement(StreamRecord record) throws Exception {
owner.internalSetKeyContextElement(record, stateKeySelector);
}
+
+   @Override
+   public void endInput() throws Exception {
+   if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
   Here can it be any other inputId apart from 1?

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker 
latencyMarker) throws Exception {
public void setKeyContextElement(StreamRecord record) throws Exception {
owner.internalSetKeyContextElement(record, stateKeySelector);
}
+
+   @Override
+   public void endInput() throws Exception {
+   if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
   The reason I am asking is because this check did not exist in any of the 
methods removed by this PR which were handling the input closing logic. 
   
   In my opinion, although from the code this check may be safe, to be on the 
safe side it may make sense to remove it so that the logic is the same. But 
feel free to disagree and leave it as is.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker 
latencyMarker) throws Exception {
public void setKeyContextElement(StreamRecord record) throws Exception {
owner.internalSetKeyContextElement(record, stateKeySelector);
}
+
+   @Override
+   public void endInput() throws Exception {
+   if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
   The reason I am asking is because this check did not exist in any of the 
methods removed by this PR which were handling the input closing logic. This 
makes me wonder if we are correct on this.
   
   In my opinion, although from the code this check may be safe, to be on the 
safe side it may make sense to remove it so that the logic is the same. But 
feel free to disagree and leave it as is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai closed pull request #158: [FLINK-19329] FunctionGroupOperator#dispose() might throw NPE during an unclean shutdown.

2020-09-22 Thread GitBox


tzulitai closed pull request #158:
URL: https://github.com/apache/flink-statefun/pull/158


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr commented on pull request #13430: [BP-1.11][FLINK-19140] Fix UDTF documentation which uses wrong alias

2020-09-22 Thread GitBox


twalthr commented on pull request #13430:
URL: https://github.com/apache/flink/pull/13430#issuecomment-696769329


   Merged into 1.11 branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] aljoscha commented on pull request #8207: [FLINK-12250] Rewrite assembleNewPartPath to let it return a new PartPath

2020-09-22 Thread GitBox


aljoscha commented on pull request #8207:
URL: https://github.com/apache/flink/pull/8207#issuecomment-696772477


   This actually looks like a good fix. I rebased, will run CI and then merge.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kottmann commented on pull request #13457: [FLINK-8357] Use rolling logs as default

2020-09-22 Thread GitBox


kottmann commented on pull request #13457:
URL: https://github.com/apache/flink/pull/13457#issuecomment-697013989


   Things to be considered
   - Any opinions about the file size? What is a good size?
   - The max number of files could be set
   - OnStartupTriggeringPolicy can be used to replace the rolling on startup 
code in the shell scripts



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai commented on pull request #158: [FLINK-19329] FunctionGroupOperator#dispose() might throw NPE during an unclean shutdown.

2020-09-22 Thread GitBox


tzulitai commented on pull request #158:
URL: https://github.com/apache/flink-statefun/pull/158#issuecomment-696548606


   LGTM, +1



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai closed pull request #160: [hotfix][dockerfile] Add newline to end of flink-conf

2020-09-22 Thread GitBox


tzulitai closed pull request #160:
URL: https://github.com/apache/flink-statefun/pull/160


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on a change in pull request #13447: [FLINK-19297][network] Make ResultPartitionWriter record-oriented

2020-09-22 Thread GitBox


StephanEwen commented on a change in pull request #13447:
URL: https://github.com/apache/flink/pull/13447#discussion_r492720319



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
##
@@ -109,89 +94,58 @@
}
}
 
-   protected void emit(T record, int targetChannel) throws IOException, 
InterruptedException {
+   protected void emit(T record, int targetSubpartition) throws 
IOException {
checkErroneous();
 
-   serializer.serializeRecord(record);
-
-   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
-   copyFromSerializerToTargetChannel(targetChannel);
-   }
-
-   /**
-* @param targetChannel
-* @return true if the intermediate serialization buffer 
should be pruned
-*/
-   protected boolean copyFromSerializerToTargetChannel(int targetChannel) 
throws IOException, InterruptedException {
-   // We should reset the initial position of the intermediate 
serialization buffer before
-   // copying, so the serialization results can be copied to 
multiple target buffers.
-   serializer.reset();
-
-   boolean pruneTriggered = false;
-   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
-   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
-   while (result.isFullBuffer()) {
-   finishBufferBuilder(bufferBuilder);
-
-   // If this was a full record, we are done. Not breaking 
out of the loop at this point
-   // will lead to another buffer request before breaking 
out (that would not be a
-   // problem per se, but it can lead to stalls in the 
pipeline).
-   if (result.isFullRecord()) {
-   pruneTriggered = true;
-   emptyCurrentBufferBuilder(targetChannel);
-   break;
-   }
-
-   bufferBuilder = requestNewBufferBuilder(targetChannel);
-   result = serializer.copyToBufferBuilder(bufferBuilder);
-   }
-   checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
+   targetPartition.emitRecord(serializeRecord(serializer, record), 
targetSubpartition);
 
if (flushAlways) {
-   flushTargetPartition(targetChannel);
+   targetPartition.flush(targetSubpartition);
}
-   return pruneTriggered;
}
 
public void broadcastEvent(AbstractEvent event) throws IOException {
broadcastEvent(event, false);
}
 
public void broadcastEvent(AbstractEvent event, boolean 
isPriorityEvent) throws IOException {
-   try (BufferConsumer eventBufferConsumer = 
EventSerializer.toBufferConsumer(event)) {
-   for (int targetChannel = 0; targetChannel < 
numberOfChannels; targetChannel++) {
-   tryFinishCurrentBufferBuilder(targetChannel);
-
-   // Retain the buffer so that it can be recycled 
by each channel of targetPartition
-   
targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel, 
isPriorityEvent);
-   }
+   targetPartition.broadcastEvent(event, isPriorityEvent);
 
-   if (flushAlways) {
-   flushAll();
-   }
+   if (flushAlways) {
+   flushAll();
}
}
 
-   public void flushAll() {
-   targetPartition.flushAll();
+   @VisibleForTesting
+   public static ByteBuffer serializeRecord(

Review comment:
   It would be really great if this method were not public. Ideally we can 
remove this completely, because all tests that use this bypass some crucial 
logic of this class and may result in meaningless tests.
   
   This method is used in three places:
 - The occurrence in `SingleInputGateTest` can be replaced with emitting a 
record.
 - The occurrence in `TestPartitionProducer` could be removed by adjusting 
`TestProducerSource` to produce `ByteBuffer` instead of `BufferConsumer`, which 
looks like a nice change that might even simplify things.
 - If the change for `PartitionTestUtils` could in theory be kept, and the 
visibility of the method be reduced to package-private.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
##
@@ -63,6 +63,22 @@ public BoundedBlockingResultPartition(
bufferPoolFactory);
}
 
+   

[GitHub] [flink] flinkbot commented on pull request #13448: [FLINK-19289][k8s] Remove pods terminated during JM failover.

2020-09-22 Thread GitBox


flinkbot commented on pull request #13448:
URL: https://github.com/apache/flink/pull/13448#issuecomment-696493031







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13128: [FLINK-18795][hbase] Support for HBase 2

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13128:
URL: https://github.com/apache/flink/pull/13128#issuecomment-672766836







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] galenwarren commented on a change in pull request #152: [FLINK-19176] Add pluggable statefun payload serializer

2020-09-22 Thread GitBox


galenwarren commented on a change in pull request #152:
URL: https://github.com/apache/flink-statefun/pull/152#discussion_r492357429



##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
##
@@ -61,4 +64,26 @@ private static void 
validateParentFirstClassloaderPatterns(Configuration configu
 }
 return parentFirstClassloaderPatterns;
   }
+
+  private static void validateCustomPayloadSerializerClassName(Configuration 
configuration) {
+
+MessageFactoryType factoryType =
+configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
+String customPayloadSerializerClassName =
+
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
+
+if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
+  if 
(StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {

Review comment:
   OK will do

##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {

Review comment:
   Yes, and it will be included in the next push

##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final MessageFactoryType type;
+  private final String customPayloadSerializerClassName;
+
+  private MessageFactoryKey(MessageFactoryType type, String 
customPayloadSerializerClassName) {
+this.type = type;
+this.customPayloadSerializerClassName = customPayloadSerializerClassName;
+  }
+
+  public static MessageFactoryKey forType(
+  MessageFactoryType type, String customPayloadSerializerClassName) {
+return new MessageFactoryKey(type, customPayloadSerializerClassName);
+  }
+
+  public MessageFactoryType getType() {
+return this.type;
+  }
+
+  public String getCustomPayloadSerializerClassName() {

Review comment:
   For some reason, before I couldn't comment here, but now I can. Weird 
glitch. Anyway, I left a top-level comment/question for you on this one, just 
FYI.

##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final MessageFactoryType type;
+  private final String customPayloadSerializerClassName;
+
+  private MessageFactoryKey(MessageFactoryType type, String 
customPayloadSerializerClassName) {
+this.type = type;

Review comment:
   Yes, will be fixed in next push





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)

2020-09-22 Thread GitBox


zentol commented on pull request #13453:
URL: https://github.com/apache/flink/pull/13453#issuecomment-696681873


   @xintongsong Do you know by chance how to setup IntelliJ to use 1 as the 
default UID?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rmetzger merged pull request #13409: [FLINK-17910][e2e] Fix debug log output to investigate rare test failure

2020-09-22 Thread GitBox


rmetzger merged pull request #13409:
URL: https://github.com/apache/flink/pull/13409


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] echauchot commented on a change in pull request #13040: [FLINK-17073] [checkpointing] checkpointing backpressure if there are too many checkpoints to clean

2020-09-22 Thread GitBox


echauchot commented on a change in pull request #13040:
URL: https://github.com/apache/flink/pull/13040#discussion_r492597386



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.StateUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Delegate class responsible for checkpoints cleaning and counting the number 
of checkpoints yet
+ * to clean.
+ */
+public class CheckpointsCleaner implements Serializable{
+   private final AtomicInteger numberOfCheckpointsToClean;
+   private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointsCleaner.class);
+
+   public CheckpointsCleaner() {
+   this.numberOfCheckpointsToClean = new AtomicInteger(0);
+   }
+
+   int getNumberOfCheckpointsToClean() {
+   return numberOfCheckpointsToClean.get();
+   }
+
+   public void cleanCheckpoint(Runnable cleanAction, Runnable 
postCleanAction, Executor executor) {
+   numberOfCheckpointsToClean.incrementAndGet();
+   executor.execute(() -> {
+   try {
+   cleanAction.run();
+   } finally {
+   numberOfCheckpointsToClean.decrementAndGet();
+   postCleanAction.run();
+   }
+   });
+   }
+
+   public void cleanStates(Runnable postCleanAction, Map operatorStates, PendingCheckpoint pendingCheckpoint, 
CheckpointStorageLocation targetLocation, Executor executor){
+   numberOfCheckpointsToClean.incrementAndGet();
+   executor.execute(() -> {
+   // discard the private states.
+   // unregistered shared states are still considered 
private at this point.
+   try {
+   
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+   targetLocation.disposeOnFailure();

Review comment:
   @rkhachatryan can you confirm that this is indeed the refactoring that 
you want before I code it ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] sjwiesman commented on pull request #159: [FLINK-19330][core] Move intialization logic to open() instead initializeState

2020-09-22 Thread GitBox


sjwiesman commented on pull request #159:
URL: https://github.com/apache/flink-statefun/pull/159#issuecomment-696412736


   Out of scope for this PR but based on our offline conversation today it 
seems like there should be a remote e2e test with TM failures 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr closed pull request #13425: [FLINK-19272][table-common] Add metadata interfaces for FLIP-107

2020-09-22 Thread GitBox


twalthr closed pull request #13425:
URL: https://github.com/apache/flink/pull/13425


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13447: [FLINK-19297][network] Make ResultPartitionWriter record-oriented

2020-09-22 Thread GitBox


flinkbot commented on pull request #13447:
URL: https://github.com/apache/flink/pull/13447#issuecomment-696485547







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] igalshilman commented on a change in pull request #152: [FLINK-19176] Add pluggable statefun payload serializer

2020-09-22 Thread GitBox


igalshilman commented on a change in pull request #152:
URL: https://github.com/apache/flink-statefun/pull/152#discussion_r492318196



##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {

Review comment:
   Can this be final?

##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final MessageFactoryType type;
+  private final String customPayloadSerializerClassName;
+
+  private MessageFactoryKey(MessageFactoryType type, String 
customPayloadSerializerClassName) {
+this.type = type;

Review comment:
   Can this be: `this.type = Objects.requireNonNull(type);`

##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
##
@@ -61,4 +64,26 @@ private static void 
validateParentFirstClassloaderPatterns(Configuration configu
 }
 return parentFirstClassloaderPatterns;
   }
+
+  private static void validateCustomPayloadSerializerClassName(Configuration 
configuration) {
+
+MessageFactoryType factoryType =
+configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
+String customPayloadSerializerClassName =
+
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
+
+if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
+  if 
(StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {

Review comment:
   I think that you are right, let's leave that as it is  

##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final MessageFactoryType type;
+  private final String customPayloadSerializerClassName;
+
+  private MessageFactoryKey(MessageFactoryType type, String 
customPayloadSerializerClassName) {
+this.type = type;
+this.customPayloadSerializerClassName = customPayloadSerializerClassName;
+  }
+
+  public static MessageFactoryKey forType(
+  MessageFactoryType type, String customPayloadSerializerClassName) {
+return new MessageFactoryKey(type, customPayloadSerializerClassName);
+  }
+
+  public MessageFactoryType getType() {
+return this.type;
+  }
+
+  public String getCustomPayloadSerializerClassName() {

Review comment:
   That is a good point!
   In that case, what do you think about making this method return an 
`Optional`,
   and version 2 would write an extra byte to indicate if the optional is 
present or not.
   
   Something along the lines:
   ```
   if (className.isPresent()) {
  out.writeBooolean(true);
  out.writeUtf8String(className.get());
   } else {
  out.writeBooolean(false);
   }
   ```
   

##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final MessageFactoryType type;
+  private final String customPayloadSerializerClassName;
+
+  private MessageFactoryKey(MessageFactoryType type, String 
customPayloadSerializerClassName) {
+this.type = type;
+this.customPayloadSerializerClassName = customPayloadSerializerClassName;
+  }
+
+  public static MessageFactoryKey forType(
+  MessageFactoryType type, String customPayloadSerializerClassName) {
+return new MessageFactoryKey(type, customPayloadSerializerClassName);
+  }
+
+  public MessageFactoryType getType() {
+return this.type;
+  }
+
+  public String getCustomPayloadSerializerClassName() {

Review comment:
   > 
   > So the serialized format would already seem to be what you've requested! 
But if you'd prefer it get written without using 
`StringUtils.writeNullableString`, I'm happy to make that change.
   
   Oh I see. My original thinking here was to prevent a future NPE by someone 
who would expect (mistakenly) that 

[GitHub] [flink] wuchong commented on pull request #13289: [FLINK-18548][table-planner] support flexible syntax for Temporal table join

2020-09-22 Thread GitBox


wuchong commented on pull request #13289:
URL: https://github.com/apache/flink/pull/13289#issuecomment-696662161







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dawidwys commented on pull request #13423: [FLINK-19269] Make the PushingAsyncDataInput.DataOutput aware of endOfInput

2020-09-22 Thread GitBox


dawidwys commented on pull request #13423:
URL: https://github.com/apache/flink/pull/13423#issuecomment-696628108


   Hi @pnowojski could you take a look at the changes, as it touches some of 
the streaming runtime parts? Or maybe you could ask somebody else that works on 
the streaming runtime part?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong merged pull request #13442: [FLINK-19321][Table SQL / Runtime]CollectSinkFunction does not define serialVersionUID

2020-09-22 Thread GitBox


wuchong merged pull request #13442:
URL: https://github.com/apache/flink/pull/13442


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #13438: [FLINK-19014][e2e] Increase startup timeout

2020-09-22 Thread GitBox


zentol commented on a change in pull request #13438:
URL: https://github.com/apache/flink/pull/13438#discussion_r492515807



##
File path: flink-dist/src/main/flink-bin/bin/flink-daemon.sh
##
@@ -88,15 +88,6 @@ out="${FLINK_LOG_PREFIX}.out"
 
 log_setting=("-Dlog.file=${log}" 
"-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" 
"-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" 
"-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")
 
-JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version 
"\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

Review comment:
   Why is this being removed? The same code exists in flink-console.sh.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)

2020-09-22 Thread GitBox


flinkbot commented on pull request #13453:
URL: https://github.com/apache/flink/pull/13453#issuecomment-696608065







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr commented on pull request #10354: [FLINK-14729][connectors] Multi-topics consuming from KafkaTableSource

2020-09-22 Thread GitBox


twalthr commented on pull request #10354:
URL: https://github.com/apache/flink/pull/10354#issuecomment-696785659


   @fangpengcheng95 Sorry, for replying so late to this PR. I think the changes 
make sense. However, we have a new table source architecture after FLIP-95. Do 
you think you have time to update this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #13332: [FLINK-19128][sql-client] Remove the runtime execution configuration in sql-client-defaults.yaml

2020-09-22 Thread GitBox


wuchong commented on pull request #13332:
URL: https://github.com/apache/flink/pull/13332#issuecomment-696672677


   Thanks @twalthr ,  will merge this then. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #13316: [FLINK-14422][runtime] Expose network memory usage to TaskManagerDetailsHandler's endpoint

2020-09-22 Thread GitBox


azagrebin commented on a change in pull request #13316:
URL: https://github.com/apache/flink/pull/13316#discussion_r492749100



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java
##
@@ -206,9 +301,9 @@ public int hashCode() {
 
@JsonCreator
public GarbageCollectorInfo(
-   @JsonProperty(FIELD_NAME_NAME) String name,
-   @JsonProperty(FIELD_NAME_COUNT) long count,
-   @JsonProperty(FIELD_NAME_TIME) long time) {
+   @JsonProperty(FIELD_NAME_NAME) String name,

Review comment:
   formatting is off

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsInfo;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerFileMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests the {@link TaskManagerDetailsHandler} implementation.
+ */
+public class TaskManagerDetailsHandlerTest extends TestLogger {
+
+   private static final Random random = new Random();
+
+   private ResourceID taskManagerId = ResourceID.generate();
+
+   private TestingResourceManagerGateway resourceManagerGateway;
+   private TaskManagerDetailsHandler testInstance;
+
+   private HandlerRequest 
handlerRequest;
+
+   public void initializeMetricStore(MetricStore metricStore) {
+   QueryScopeInfo.TaskManagerQueryScopeInfo tmScope = new 
QueryScopeInfo.TaskManagerQueryScopeInfo(taskManagerId.toString(), "Status");

Review comment:
   ```suggestion
private static void initializeMetricStore(MetricStore metricStore) {
QueryScopeInfo.TaskManagerQueryScopeInfo tmScope = new 
QueryScopeInfo.TaskManagerQueryScopeInfo(TASK_MANAGER_ID.toString(), "Status");
   ```

##
File path: 

[GitHub] [flink] rkhachatryan commented on a change in pull request #13040: [FLINK-17073] [checkpointing] checkpointing backpressure if there are too many checkpoints to clean

2020-09-22 Thread GitBox


rkhachatryan commented on a change in pull request #13040:
URL: https://github.com/apache/flink/pull/13040#discussion_r492606259



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.StateUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Delegate class responsible for checkpoints cleaning and counting the number 
of checkpoints yet
+ * to clean.
+ */
+public class CheckpointsCleaner implements Serializable{
+   private final AtomicInteger numberOfCheckpointsToClean;
+   private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointsCleaner.class);
+
+   public CheckpointsCleaner() {
+   this.numberOfCheckpointsToClean = new AtomicInteger(0);
+   }
+
+   int getNumberOfCheckpointsToClean() {
+   return numberOfCheckpointsToClean.get();
+   }
+
+   public void cleanCheckpoint(Runnable cleanAction, Runnable 
postCleanAction, Executor executor) {
+   numberOfCheckpointsToClean.incrementAndGet();
+   executor.execute(() -> {
+   try {
+   cleanAction.run();
+   } finally {
+   numberOfCheckpointsToClean.decrementAndGet();
+   postCleanAction.run();
+   }
+   });
+   }
+
+   public void cleanStates(Runnable postCleanAction, Map operatorStates, PendingCheckpoint pendingCheckpoint, 
CheckpointStorageLocation targetLocation, Executor executor){
+   numberOfCheckpointsToClean.incrementAndGet();
+   executor.execute(() -> {
+   // discard the private states.
+   // unregistered shared states are still considered 
private at this point.
+   try {
+   
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+   targetLocation.disposeOnFailure();

Review comment:
   Sorry, my reply got lost for some reason.
   
   I think that `operatorStates.clear();` should remain in `PendingCheckpoint` 
too lest break encapsulation.
   So the original Runnable can be submitted as `cleanAction` to 
`CheckpointCleaner.cleanCheckpoint`.:
   ```
   try {
   StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
   targetLocation.disposeOnFailure();
   } catch (Throwable t) {
   LOG.warn("Could not properly dispose the private states in the pending 
checkpoint {} of job {}.",
   checkpointId, jobId, t);
   } finally {
   operatorStates.clear();
   }
   ```

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 

[GitHub] [flink] lirui-apache commented on a change in pull request #13434: [FLINK-19292][hive] HiveCatalog should support specifying Hadoop conf dir with configuration

2020-09-22 Thread GitBox


lirui-apache commented on a change in pull request #13434:
URL: https://github.com/apache/flink/pull/13434#discussion_r492720329



##
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
##
@@ -57,10 +59,10 @@
public ExpectedException expectedException = ExpectedException.none();
 
@Test
-   public void test() {
+   public void test() throws IOException {

Review comment:
   Besides, we need to add a test case for the hadoop conf dir 
configuration.

##
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
##
@@ -57,10 +59,10 @@
public ExpectedException expectedException = ExpectedException.none();
 
@Test
-   public void test() {
+   public void test() throws IOException {

Review comment:
   Create a new test case for this

##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
##
@@ -199,7 +207,7 @@ private static HiveConf createHiveConf(@Nullable String 
hiveConfDir) {
Configuration hadoopConf = 
HadoopUtils.getHadoopConfiguration(new 
org.apache.flink.configuration.Configuration());
 
// Add mapred-site.xml. We need to read configurations like 
compression codec.
-   for (String possibleHadoopConfPath : 
HadoopUtils.possibleHadoopConfPaths(new 
org.apache.flink.configuration.Configuration())) {
+   for (String possibleHadoopConfPath : 
HadoopUtils.possibleHadoopConfPaths(new 
org.apache.flink.configuration.Configuration(), hadoopConfDir)) {

Review comment:
   Let's not modify `HadoopUtils`. Instead, if `hadoopConfDir` is not null, 
set `ConfigConstants.PATH_HADOOP_CONFIG` in the `Configuration` instance.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13445: [FLINK-19331][state-processor-api] Native resource leak when working with RocksDB

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13445:
URL: https://github.com/apache/flink/pull/13445#issuecomment-696389960







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai commented on pull request #157: [FLINK-19327][k8s] Bump JobManager heap size to 1 GB

2020-09-22 Thread GitBox


tzulitai commented on pull request #157:
URL: https://github.com/apache/flink-statefun/pull/157#issuecomment-696560584


   Merging ...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] carp84 commented on a change in pull request #13405: [FLINK-19270] Extract an inteface from AbstractKeyedStateBackend

2020-09-22 Thread GitBox


carp84 commented on a change in pull request #13405:
URL: https://github.com/apache/flink/pull/13405#discussion_r492641148



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
##
@@ -282,8 +283,8 @@ public void setCurrentKey(Object key) {
if (keyedStateBackend != null) {
try {
// need to work around type restrictions
-   @SuppressWarnings("unchecked,rawtypes")
-   AbstractKeyedStateBackend rawBackend = 
(AbstractKeyedStateBackend) keyedStateBackend;
+   @SuppressWarnings("rawtypes")
+   CheckpointableKeyedStateBackend rawBackend = 
keyedStateBackend;
 
rawBackend.setCurrentKey(key);

Review comment:
   The suggested change looks ok in my local IntelliJ, could you double 
check? Thanks. :-)

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
##
@@ -282,8 +283,8 @@ public void setCurrentKey(Object key) {
if (keyedStateBackend != null) {
try {
// need to work around type restrictions
-   @SuppressWarnings("unchecked,rawtypes")
-   AbstractKeyedStateBackend rawBackend = 
(AbstractKeyedStateBackend) keyedStateBackend;
+   @SuppressWarnings("rawtypes")
+   CheckpointableKeyedStateBackend rawBackend = 
keyedStateBackend;
 
rawBackend.setCurrentKey(key);

Review comment:
   True, but that's why we need to suppress the "unchecked" warning, rather 
than "rawtypes"? (In my local env it will also show the warning if I remove the 
"unchecked" suppression from the method signature)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13450: [FLINK-19339] Support unions with logical types in Avro >= 1.9.x

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13450:
URL: https://github.com/apache/flink/pull/13450#issuecomment-696552930







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13452: [FLINK-19273][sql-parser] Support METADATA syntax in SQL parser

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13452:
URL: https://github.com/apache/flink/pull/13452#issuecomment-696611770







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)

2020-09-22 Thread GitBox


XComp commented on a change in pull request #13453:
URL: https://github.com/apache/flink/pull/13453#discussion_r492673612



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/slots/ResourceRequirement.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.slots;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Represents the number of required resource for a specific {@link 
ResourceProfile}.

Review comment:
   Being picky here since it's the initial commit. 8)
   ```suggestion
* Represents the number of required resources for a specific {@link 
ResourceProfile}.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rmetzger commented on a change in pull request #13438: [FLINK-19014][e2e] Increase startup timeout

2020-09-22 Thread GitBox


rmetzger commented on a change in pull request #13438:
URL: https://github.com/apache/flink/pull/13438#discussion_r492646461



##
File path: flink-dist/src/main/flink-bin/bin/flink-daemon.sh
##
@@ -88,15 +88,6 @@ out="${FLINK_LOG_PREFIX}.out"
 
 log_setting=("-Dlog.file=${log}" 
"-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" 
"-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" 
"-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")
 
-JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version 
"\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

Review comment:
   Damn, I need to work on my `git grep` skills  
   
   I removed it because the java version is used to check if the version is 
less than 1.8. Since it can never be less than 1.8, we don't need this check 
(anymore).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13455: [FLINK-19361][hive] Create a synchronized metastore client to talk to…

2020-09-22 Thread GitBox


flinkbot commented on pull request #13455:
URL: https://github.com/apache/flink/pull/13455#issuecomment-696719238







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13457: [FLINK-8357] Use rolling logs as default

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13457:
URL: https://github.com/apache/flink/pull/13457#issuecomment-697012704







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13289: [FLINK-18548][table-planner] support flexible syntax for Temporal table join

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13289:
URL: https://github.com/apache/flink/pull/13289#issuecomment-683894616







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] leonardBang commented on pull request #13128: [FLINK-18795][hbase] Support for HBase 2

2020-09-22 Thread GitBox


leonardBang commented on pull request #13128:
URL: https://github.com/apache/flink/pull/13128#issuecomment-696498920


   One more tip, that's great if we could add e2e test for `hbase 2` just like 
kafka support multiple versions e2e test, we can keep `hbase 2` test once we 
decided to deprecate the `hbase 1`
   
   
https://github.com/apache/flink/blob/607919c6ea6983ae5ad3f82d63b7d6455c73d225/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/resources/hbase_e2e.sql#L22
   
   
https://github.com/apache/flink/blob/607919c6ea6983ae5ad3f82d63b7d6455c73d225/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_e2e.sql#L23
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13446: [FLINK-18779]Support the SupportsFilterPushDown for LookupTableSource

2020-09-22 Thread GitBox


flinkbot commented on pull request #13446:
URL: https://github.com/apache/flink/pull/13446#issuecomment-696484481







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu merged pull request #13420: [FLINK-19229][python] Introduce the PythonStreamGroupAggregateOperator for Python UDAF.

2020-09-22 Thread GitBox


dianfu merged pull request #13420:
URL: https://github.com/apache/flink/pull/13420


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on pull request #13440: [FLINK-18725][e2e] Use ClusterIP instead of NodePort and remove query port in internal jobmanager service

2020-09-22 Thread GitBox


wangyang0918 commented on pull request #13440:
URL: https://github.com/apache/flink/pull/13440#issuecomment-696452209


   cc @rmetzger, could you please have a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu commented on a change in pull request #13451: [FLINK-19333][python] Introduce BatchArrowPythonOverWindowAggregateFunctionOperator

2020-09-22 Thread GitBox


dianfu commented on a change in pull request #13451:
URL: https://github.com/apache/flink/pull/13451#discussion_r492600107



##
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##
@@ -44,12 +44,33 @@ message UserDefinedFunction {
   // 2. The result of another user-defined function
   // 3. The constant value of the column
   repeated Input inputs = 2;
+
+  // Used in pandas batch over window aggregation
+  int32 windowIndex = 3;
 }
 
 // A list of user-defined functions to be executed in a batch.
 message UserDefinedFunctions {
   repeated UserDefinedFunction udfs = 1;
   bool metric_enabled = 2;
+  repeated Window windows = 3;
+}
+
+// Used to describe the info of over window in pandas batch over window 
aggregation
+message Window {

Review comment:
   What about renamed as OverWindow?

##
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Batch Arrow Python {@link AggregateFunction} Operator for Over Window 
Aggregation.
+ */
+@Internal
+public class BatchArrowPythonOverWindowAggregateFunctionOperator
+   extends AbstractBatchArrowPythonAggregateFunctionOperator {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final String SCHEMA_OVER_WINDOW_ARROW_CODER_URN = 
"flink:coder:schema:over_window_arrow:v1";
+
+   private static final String PANDAS_BATCH_OVER_WINDOW_AGG_FUNCTION_URN = 
"flink:transform:batch_over_window_aggregate_function:arrow:v1";
+
+   /**
+* Used to serialize the boundary of range window.
+*/
+   private static final IntSerializer windowBoundarySerializer = 
IntSerializer.INSTANCE;
+
+   /**
+* Window lower boundary. e.g. Long.MIN_VALUE means unbounded preceding.
+*/
+   private final long[] lowerBoundary;
+
+   /**
+* Window upper boundary. e.g. Long.MAX_VALUE means unbounded following.
+*/
+   private final long[] upperBoundary;
+
+   /**
+* Whether the specified position window is a range window.
+*/
+   private final boolean[] isRangeWindows;
+
+   /**
+* The window index of the specified aggregate function belongs to.
+*/
+   private final int[] aggWindowIndex;
+
+   /**
+* The row time index of the input data.
+*/
+   private final int inputTimeFieldIndex;
+
+   /**
+* The order of row time. True for ascending.
+*/
+   private final boolean order;
+
+   /**
+* The type serializer for the forwarded fields.
+*/
+   private transient RowDataSerializer forwardedInputSerializer;
+
+   /**
+* Stores the start position of the last key data in 
forwardedInputQueue.
+*/
+   private transient int lastKeyDataStartPos;
+
+   /**
+* Reusable OutputStream used to holding the window boundary with input 
elements.
+*/
+   private transient ByteArrayOutputStreamWithPos 
windowBoundaryWithDataBaos;
+
+   /**
+* OutputStream Wrapper.
+*/
+   private transient DataOutputViewStreamWrapper 

[GitHub] [flink] dawidwys commented on a change in pull request #13423: [FLINK-19269] Make the PushingAsyncDataInput.DataOutput aware of endOfInput

2020-09-22 Thread GitBox


dawidwys commented on a change in pull request #13423:
URL: https://github.com/apache/flink/pull/13423#discussion_r492618028



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker 
latencyMarker) throws Exception {
public void setKeyContextElement(StreamRecord record) throws Exception {
owner.internalSetKeyContextElement(record, stateKeySelector);
}
+
+   @Override
+   public void endInput() throws Exception {
+   if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
   It shouldn't. Nevertheless I think it is not safe to just assume so.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
##
@@ -107,7 +106,11 @@ public void run(final Object lockingObject,
// in theory, the subclasses of StreamSource 
may implement the BoundedOneInput interface,
// so we still need the following call to end 
the input
synchronized (lockingObject) {
-   operatorChain.endMainOperatorInput(1);
+   if (this instanceof BoundedOneInput) {
+   ((BoundedOneInput) 
this).endInput();
+   } else if (this instanceof 
BoundedMultiInput) {
+   ((BoundedMultiInput) 
this).endInput(1);

Review comment:
   Not sure if I understand the comment.
   
   Does a StreamSource even have inputs? Of course I might be missing something 
here, but isn't the input id, the id of an input that has finished? Honestly, I 
find this part of code quite shady, but I think the logic pre and post this 
change is the same. Again I might be wrong.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
##
@@ -107,7 +106,11 @@ public void run(final Object lockingObject,
// in theory, the subclasses of StreamSource 
may implement the BoundedOneInput interface,
// so we still need the following call to end 
the input
synchronized (lockingObject) {
-   operatorChain.endMainOperatorInput(1);
+   if (this instanceof BoundedOneInput) {
+   ((BoundedOneInput) 
this).endInput();
+   } else if (this instanceof 
BoundedMultiInput) {
+   ((BoundedMultiInput) 
this).endInput(1);

Review comment:
   Sorry, I am not sure if I understand the comment.
   
   Does a StreamSource even have inputs? Of course I might be missing something 
here, but isn't the input id, the id of an input that has finished? Honestly, I 
find this part of code quite shady, but I think the logic pre and post this 
change is the same. Again I might be wrong.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
##
@@ -107,7 +106,11 @@ public void run(final Object lockingObject,
// in theory, the subclasses of StreamSource 
may implement the BoundedOneInput interface,
// so we still need the following call to end 
the input
synchronized (lockingObject) {
-   operatorChain.endMainOperatorInput(1);
+   if (this instanceof BoundedOneInput) {
+   ((BoundedOneInput) 
this).endInput();
+   } else if (this instanceof 
BoundedMultiInput) {
+   ((BoundedMultiInput) 
this).endInput(1);

Review comment:
   Sorry, I am not sure if I understand the comment.
   
   Does a StreamSource even have inputs? Of course I might be missing something 
here, but isn't the input id, the id of an input that has finished? Honestly, I 
find this part of code quite shady (as there are actually no inputs that can 
finish, imo), but I think the logic pre and post this change is the same. Again 
I might be wrong.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
##
@@ -107,7 +106,11 @@ public void run(final Object lockingObject,
// in theory, the subclasses of StreamSource 
may implement the BoundedOneInput interface,
// so we still need the following call to end 
the input
 

[GitHub] [flink] aljoscha commented on a change in pull request #13452: [FLINK-19273][sql-parser] Support METADATA syntax in SQL parser

2020-09-22 Thread GitBox


aljoscha commented on a change in pull request #13452:
URL: https://github.com/apache/flink/pull/13452#discussion_r492584757



##
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java
##
@@ -115,9 +115,10 @@
 * ALL - a shortcut to change the default merging strategy if 
none provided
 * CONSTRAINTS - constraints such as primary and unique 
keys
 * GENERATED - computed columns
+* METADATA - metadata columns
 * WATERMARKS - watermark declarations
 * PARTITIONS - partition of the tables
-* OPTIONS - connector options that decribed connector and 
format properties
+* OPTIONS - connector options that described connector and 
format properties

Review comment:
   ```suggestion
 * OPTIONS - connector options that describe connector and 
format properties
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu closed pull request #13230: [FLINK-18950][python][docs] Add documentation for Operations in Python DataStream API.

2020-09-22 Thread GitBox


dianfu closed pull request #13230:
URL: https://github.com/apache/flink/pull/13230


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #13289: [FLINK-18548][table-planner] support flexible syntax for Temporal table join

2020-09-22 Thread GitBox


wuchong commented on a change in pull request #13289:
URL: https://github.com/apache/flink/pull/13289#discussion_r492517117



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
##
@@ -163,7 +163,35 @@ class FlinkPlannerImpl(
 sqlValidator.getCatalogReader.unwrap(classOf[CalciteCatalogReader]),
 cluster,
 convertletTable,
-sqlToRelConverterConfig)
+sqlToRelConverterConfig) {
+// override convertFrom() to support flexible Temporal Table Syntax,
+// this can be revert once FLINK-16579(Upgrade Calcite version to 
1.23) resolved.
+val relBuilder = config.getRelBuilderFactory.create(cluster, null)
+
+override def convertFrom(bb: SqlToRelConverter#Blackboard, from: 
SqlNode): Unit = {

Review comment:
   Create a JIRA issue to remove this overriding once we bump up Calcite 
version. And add comment above this method with the JIRA id. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13445: [FLINK-19331][state-processor-api] Native resource leak when working with RocksDB

2020-09-22 Thread GitBox


flinkbot commented on pull request #13445:
URL: https://github.com/apache/flink/pull/13445#issuecomment-696385131







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] gaoyunhaii commented on a change in pull request #13357: [FLINK-19165] Refactor the UnilateralSortMerger

2020-09-22 Thread GitBox


gaoyunhaii commented on a change in pull request #13357:
URL: https://github.com/apache/flink/pull/13357#discussion_r492529818



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CircularQueues.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Collection of queues that are used for the communication between the 
threads.
+ */
+final class CircularQueues implements StageRunner.StageMessageDispatcher 
{
+   private final BlockingQueue> empty;
+   private final BlockingQueue> sort;
+   private final BlockingQueue> spill;
+
+   private boolean isFinished = false;

Review comment:
   Very sorry that there is one issue missing, but it just come to me that 
we might need to make the `isFinished` to be `volatile`: the 
`CircularQueues#close()` might be called from `SpilledThread` or `SortThread`, 
and we might need to read this variable in the task main thread. Since 
`ExternalSorter#close()` guarantees that there should be no concurrent writer, 
therefore where we need only `volatile` instead of atomic variables here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13449: [FLINK-19282][table sql/planner]Supports watermark push down with Wat…

2020-09-22 Thread GitBox


flinkbot commented on pull request #13449:
URL: https://github.com/apache/flink/pull/13449#issuecomment-696505414







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on pull request #13412: [FLINK-19069] execute finalizeOnMaster with io executor to avoid hea…

2020-09-22 Thread GitBox


StephanEwen commented on pull request #13412:
URL: https://github.com/apache/flink/pull/13412#issuecomment-696662169


   Just leaving a quick comment here that the new sinks that @aljoscha and 
@guoweiM are working on will be different and not affected by this. So this 
here is more of a temporary fix.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu closed pull request #13421: [FLINK-19186][python] Add Python building blocks to make sure the basic functionality of Pandas Batch Group Aggregation could work

2020-09-22 Thread GitBox


dianfu closed pull request #13421:
URL: https://github.com/apache/flink/pull/13421


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13216: [FLINK-18999][table-planner-blink][hive] Temporary generic table does…

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13216:
URL: https://github.com/apache/flink/pull/13216#issuecomment-678268420







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dawidwys commented on pull request #13405: [FLINK-19270] Extract an inteface from AbstractKeyedStateBackend

2020-09-22 Thread GitBox


dawidwys commented on pull request #13405:
URL: https://github.com/apache/flink/pull/13405#issuecomment-696625513


   Could you have another look @carp84 @aljoscha ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] klion26 commented on a change in pull request #13003: [FLINK-18737][docs]translate jdbc connector

2020-09-22 Thread GitBox


klion26 commented on a change in pull request #13003:
URL: https://github.com/apache/flink/pull/13003#discussion_r492695512



##
File path: docs/dev/connectors/jdbc.zh.md
##
@@ -38,12 +38,12 @@ To use it, add the following dependency to your project 
(along with your JDBC-dr
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently __NOT__ part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/project-configuration.html).
+注意连接器目前还 __不是__ 二进制发行版的一部分,如何在集群中运行请参考 [这里]({% link 
dev/project-configuration.zh.md %})。

Review comment:
   `注意连接器` -> `注意该连接器` 会好一些吗?
   现在的描述读起来有一点点拗口

##
File path: docs/dev/connectors/jdbc.zh.md
##
@@ -64,4 +66,4 @@ env
 env.execute();
 {% endhighlight %}
 
-Please refer to the [API documentation]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html) for more details.
+更多细节请查看 [API documentation]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html) 。

Review comment:
   或许你可以试试 `({% link 
/api/java/org/apache/flink/connector/jdbc/JdbcSink.html %})` 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] gaoyunhaii commented on a change in pull request #13377: [FLINK-18592] bugfix for StreamingFileSink

2020-09-22 Thread GitBox


gaoyunhaii commented on a change in pull request #13377:
URL: https://github.com/apache/flink/pull/13377#discussion_r492593704



##
File path: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
##
@@ -357,6 +364,126 @@ private static boolean waitUntilLeaseIsRevoked(final 
FileSystem fs, final Path p
}
isClosed = dfs.isFileClosed(path);
}
+   // [FLINK-18592] recover lease after the lease timeout passed 
but file was still not closed
+   if(!isClosed && !deadline.hasTimeLeft()){
+   recoverLease(path, dfs);

Review comment:
   I think we might merge this process with the original lease recovering 
logic. 

##
File path: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
##
@@ -357,6 +364,126 @@ private static boolean waitUntilLeaseIsRevoked(final 
FileSystem fs, final Path p
}
isClosed = dfs.isFileClosed(path);
}
+   // [FLINK-18592] recover lease after the lease timeout passed 
but file was still not closed
+   if(!isClosed && !deadline.hasTimeLeft()){
+   recoverLease(path, dfs);
+   }
return isClosed;
}
+
+
+   /*
+* Run the dfs recover lease. recoverLease is asynchronous. It returns: 
-false when it starts the lease recovery (i.e. lease recovery not *yet* done) - 
true when the lease recovery has
+* succeeded or the file is closed.
+*
+* But, we have to be careful. Each time we call recoverLease, it 
starts the recover lease process over from the beginning. We could put 
ourselves in a situation
+* where we are doing nothing but starting a recovery, interrupting it 
to start again, and so on.
+*
+* The namenode will try to recover the lease on the file's primary 
node. If all is well, it should return near immediately.
+* But, as is common, it is the very primary node that has crashed and 
so the namenode will be stuck waiting on a socket timeout before it will ask 
another datanode to start the recovery.
+* It does not help if we call recoverLease in the meantime and in 
particular, subsequent to the socket timeout, a recoverLease invocation will 
cause us to start over from square one
+* (possibly waiting on socket timeout against primary node).
+* So, in the below, we do the following:
+* 1. Call recoverLease.
+* 2. If it returns true, break.
+* 3. If it returns false, wait a few seconds and then call it again.
+* 4. If it returns true, break.
+* 5. If it returns false, wait for what we think the datanode socket 
timeout is (configurable) and then try again.
+* 6. If it returns true, break.
+* 7. If it returns false, repeat starting at step 5. above. If 
HDFS-4525 is available, call it every second and we might be able to exit early.
+*/
+   private static boolean recoverLease(Path path, DistributedFileSystem 
dfs) throws IOException {
+   LOG.info("Recover lease on dfs file " + path);
+   long startWaiting = System.currentTimeMillis();
+   // Default is 15 minutes. It's huge, but the idea is that if we 
have a major issue, HDFS
+   // usually needs 10 minutes before marking the nodes as dead. 
So we're putting ourselves
+   // beyond that limit 'to be safe'.
+   //Configuration conf = dfs.getConf();
+   long recoveryTimeout = HdfsConstants.LEASE_HARDLIMIT_PERIOD / 4;
+   long recoveryTargetTimeout = recoveryTimeout + startWaiting;
+   // This setting should be a little bit above what the cluster 
dfs heartbeat is set to.
+   long firstPause = 4000L;
+   long pause = 1000L;
+   // This should be set to how long it'll take for us to timeout 
against primary datanode if it
+   // is dead. We set it to 64 seconds, 4 second than the default 
READ_TIMEOUT in HDFS, the
+   // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery 
is still failing after this
+   // timeout, then further recovery will take liner backoff with 
this base, to avoid endless
+   // preemptions when this value is not properly configured.
+   long subsequentPauseBase = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
+
+   Method isFileClosedMeth = null;
+   // whether we need to look for isFileClosed method
+   boolean findIsFileClosedMeth = true;
+   boolean recovered = false;
+   // We break the loop if we succeed the lease recovery, timeout, 
or we throw an exception.
+   for 

[GitHub] [flink-statefun] tzulitai commented on pull request #159: [FLINK-19330][core] Move intialization logic to open() instead initializeState

2020-09-22 Thread GitBox


tzulitai commented on pull request #159:
URL: https://github.com/apache/flink-statefun/pull/159#issuecomment-696546718


   I think we can easily combine the exactly-once and remote module E2E tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong merged pull request #13426: [FLINK-19244] CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.

2020-09-22 Thread GitBox


wuchong merged pull request #13426:
URL: https://github.com/apache/flink/pull/13426


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangxlong commented on pull request #11125: [FLINK-16147][Flink-Table-Common] WatermarkSepc#toString contain watermarkExprOutputType field

2020-09-22 Thread GitBox


wangxlong commented on pull request #11125:
URL: https://github.com/apache/flink/pull/11125#issuecomment-697035565


   @twalthr Thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13416: [FLINK-19179] Extend managed memory fraction calculation for various use cases.

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13416:
URL: https://github.com/apache/flink/pull/13416#issuecomment-694678006







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on pull request #13450: [FLINK-19339] Support unions with logical types in Avro >= 1.9.x

2020-09-22 Thread GitBox


StephanEwen commented on pull request #13450:
URL: https://github.com/apache/flink/pull/13450#issuecomment-696649675


   Looks good to me.
   
   Is this also relevant for other Avro parts, like AvroInputFormat, 
AvroSerializer, ... ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr commented on pull request #11125: [FLINK-16147][Flink-Table-Common] WatermarkSepc#toString contain watermarkExprOutputType field

2020-09-22 Thread GitBox


twalthr commented on pull request #11125:
URL: https://github.com/apache/flink/pull/11125#issuecomment-696793203


   I will merge this for now. But we might rework this part soon again when 
touching `TableSchema`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13448: [FLINK-19289][k8s] Remove pods terminated during JM failover.

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13448:
URL: https://github.com/apache/flink/pull/13448#issuecomment-696496421







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-playgrounds] alpinegizmo commented on a change in pull request #16: [FLINK-19145][walkthroughs] Add PyFlink-walkthrough to Flink playground.

2020-09-22 Thread GitBox


alpinegizmo commented on a change in pull request #16:
URL: https://github.com/apache/flink-playgrounds/pull/16#discussion_r492952901



##
File path: pyflink-walkthrough/README.md
##
@@ -102,6 +106,24 @@ $ docker-compose exec jobmanager ./bin/flink run -py 
/opt/pyflink-walkthrough/pa
 
 3. Stop the PyFlink job:
 
-Visit the Flink Web UI at 
[http://localhost:8081/#/overview](http://localhost:8081/#/overview) , select 
the job and click `Cancle` on the upper right side.
+Visit the Flink Web UI at 
[http://localhost:8081/#/overview](http://localhost:8081/#/overview) , select 
the job and click `Cancel` on the upper right side.
 
 ![image](pic/cancel.png)
+
+## Extension
+
+You are able to edit the 
[payment_msg_processing.py](payment_msg_proccessing.py) or create new PyFlink 
+projects to perform more complex processing logic locally on your operating 
system under the `pyflink-walkthrough` 
+directory since it is mounted on the `jobmanager` docker container. Such as:
+* Creating a new Kafka source table;
+* Creating a new index for the Elasticsearch sink;
+* Calculating the amount of transactions grouped by a 1 minute tumble window 
and payPlatforms.
+
+After the modification, you can submit the new job by executing the same 
command mentioned at 
+[Running the PyFlink Job](#running-the-pyflink-job)
+```shell script
+$ docker-compose exec jobmanager ./bin/flink run -py 
/opt/pyflink-walkthrough/payment_msg_proccessing.py -d
+```
+
+Furthermore, you can also [create new kibana 
dashboards](https://www.elastic.co/guide/en/kibana/7.8/dashboard-create-new-dashboard.html)
 
+to visualize more charts of various dimension based on the persistent indexes 
in Elasticsearch.

Review comment:
   ```suggestion
   ## Further Explorations
   
   If you would like to explore this example more deeply, you can edit 
[payment_msg_processing.py](payment_msg_proccessing.py) or create new PyFlink 
   projects that perform more complex processing. You can do this locally under 
the `pyflink-walkthrough` 
   directory, since it is mounted on the `jobmanager` docker container. 
   
   Ideas:
   * Add your own Kafka source table;
   * Create a new index for the Elasticsearch sink;
   * Count the number of transactions, grouped by a 1 minute tumbling windows, 
and by payPlatform.
   
   After making a modification, you can submit the new job by executing the 
same command mentioned at 
   [Running the PyFlink Job](#running-the-pyflink-job)
   ```bash
   $ docker-compose exec jobmanager ./bin/flink run -py 
/opt/pyflink-walkthrough/payment_msg_proccessing.py -d
   ```
   
   Furthermore, you can also [create new kibana 
dashboards](https://www.elastic.co/guide/en/kibana/7.8/dashboard-create-new-dashboard.html)
 
   that visualize other aspects of the data in the Elasticsearch.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13454: [FLINK-19304][coordination] Add FLIP-138 feature toggle

2020-09-22 Thread GitBox


flinkbot commented on pull request #13454:
URL: https://github.com/apache/flink/pull/13454#issuecomment-696661067







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on pull request #13448: [FLINK-19289][k8s] Remove pods terminated during JM failover.

2020-09-22 Thread GitBox


xintongsong commented on pull request #13448:
URL: https://github.com/apache/flink/pull/13448#issuecomment-696541373







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on pull request #13445: [FLINK-19331][state-processor-api] Native resource leak when working with RocksDB

2020-09-22 Thread GitBox


sjwiesman commented on pull request #13445:
URL: https://github.com/apache/flink/pull/13445#issuecomment-696707924


   Thanks for pointing that out. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)

2020-09-22 Thread GitBox


flinkbot edited a comment on pull request #13453:
URL: https://github.com/apache/flink/pull/13453#issuecomment-696623367







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   7   >