[jira] [Created] (FLINK-17715) Supports create function in SQL-CLI

2020-05-14 Thread Danny Chen (Jira)
Danny Chen created FLINK-17715:
--

 Summary: Supports create function in SQL-CLI
 Key: FLINK-17715
 URL: https://issues.apache.org/jira/browse/FLINK-17715
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Client
Affects Versions: 1.10.0
Reporter: Danny Chen
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17596) Unstable PyFlinkBlinkStreamUserDefinedFunctionTests testMethod#test_udf_in_join_condition_2

2020-05-14 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-17596:

Labels: pull-request-available test-stability  (was: pull-request-available)

> Unstable PyFlinkBlinkStreamUserDefinedFunctionTests 
> testMethod#test_udf_in_join_condition_2
> ---
>
> Key: FLINK-17596
> URL: https://issues.apache.org/jira/browse/FLINK-17596
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.11.0
>Reporter: Yun Tang
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>
> Unlike FLINK-14865, this failure of 
> PyFlinkBlinkStreamUserDefinedFunctionTests 
> testMethod#test_udf_in_join_condition_2 is caused by "no such file" IO 
> exception.
> instance link: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=826=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=455fddbf-5921-5b71-25ac-92992ad80b28
> {code:java}
>  Caused by: java.lang.RuntimeException: Failed to create stage bundle factory!
>   at 
> org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197)
>   at 
> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164)
>   at 
> org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
>   at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
>   at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:142)
>   at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:288)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:459)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:455)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:475)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:713)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:539)
>   at java.lang.Thread.run(Thread.java:748)
>  Caused by: 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.io.IOException: Cannot run program 
> "/tmp/python-dist-38f0c31e-9801-4d53-9694-2e16fd4b2fc5/pyflink-udf-runner.sh":
>  error=2, No such file or directory
>   at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
>   at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:331)
>   at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:320)
>   at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:250)
>   at 
> org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:195)
>   ... 16 more
>  Caused by: java.io.IOException: Cannot run program 
> "/tmp/python-dist-38f0c31e-9801-4d53-9694-2e16fd4b2fc5/pyflink-udf-runner.sh":
>  error=2, No such file or directory
>   at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>   at 
> org.apache.beam.runners.fnexecution.environment.ProcessManager.startProcess(ProcessManager.java:144)
>   at 
> org.apache.beam.runners.fnexecution.environment.ProcessManager.startProcess(ProcessManager.java:119)
>   at 
> 

[GitHub] [flink] wangyang0918 commented on pull request #11839: [FLINK-17166][dist] Modify the log4j-console.properties to also output logs into the files for WebUI

2020-05-14 Thread GitBox


wangyang0918 commented on pull request #11839:
URL: https://github.com/apache/flink/pull/11839#issuecomment-629044350


   @zentol I have updated the PR according to the discussion. We will start 
with `.log` file by using rolling appender. I think it could meet the most 
requirements and leave the open question about `.out` and `.err` file later.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-17596) Unstable PyFlinkBlinkStreamUserDefinedFunctionTests testMethod#test_udf_in_join_condition_2

2020-05-14 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-17596.
---
Resolution: Fixed

Merged to master via 282da0dd3e2def9e0ea6693f953d2733eb663e70

> Unstable PyFlinkBlinkStreamUserDefinedFunctionTests 
> testMethod#test_udf_in_join_condition_2
> ---
>
> Key: FLINK-17596
> URL: https://issues.apache.org/jira/browse/FLINK-17596
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.11.0
>Reporter: Yun Tang
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> Unlike FLINK-14865, this failure of 
> PyFlinkBlinkStreamUserDefinedFunctionTests 
> testMethod#test_udf_in_join_condition_2 is caused by "no such file" IO 
> exception.
> instance link: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=826=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=455fddbf-5921-5b71-25ac-92992ad80b28
> {code:java}
>  Caused by: java.lang.RuntimeException: Failed to create stage bundle factory!
>   at 
> org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197)
>   at 
> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164)
>   at 
> org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
>   at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
>   at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:142)
>   at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
>   at 
> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:288)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:459)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:455)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:475)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:713)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:539)
>   at java.lang.Thread.run(Thread.java:748)
>  Caused by: 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.io.IOException: Cannot run program 
> "/tmp/python-dist-38f0c31e-9801-4d53-9694-2e16fd4b2fc5/pyflink-udf-runner.sh":
>  error=2, No such file or directory
>   at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
>   at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:331)
>   at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:320)
>   at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:250)
>   at 
> org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:195)
>   ... 16 more
>  Caused by: java.io.IOException: Cannot run program 
> "/tmp/python-dist-38f0c31e-9801-4d53-9694-2e16fd4b2fc5/pyflink-udf-runner.sh":
>  error=2, No such file or directory
>   at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>   at 
> org.apache.beam.runners.fnexecution.environment.ProcessManager.startProcess(ProcessManager.java:144)
>   at 
> org.apache.beam.runners.fnexecution.environment.ProcessManager.startProcess(ProcessManager.java:119)
>   at 
> 

[GitHub] [flink] hequn8128 commented on pull request #12160: [FLINK-17710][python][hotfix] StreamSqlTests.test_execute_sql test is…

2020-05-14 Thread GitBox


hequn8128 commented on pull request #12160:
URL: https://github.com/apache/flink/pull/12160#issuecomment-629043705


   @rmetzger Thank you for your help.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu closed pull request #12092: [FLINK-17596][python] Move the Python UDF runner script out of the jar of flink-python

2020-05-14 Thread GitBox


dianfu closed pull request #12092:
URL: https://github.com/apache/flink/pull/12092


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17706) Clarify licensing situation

2020-05-14 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107974#comment-17107974
 ] 

Piotr Nowojski commented on FLINK-17706:


I'm ok with relicensing my contributions to the Apache License 2.0

> Clarify licensing situation
> ---
>
> Key: FLINK-17706
> URL: https://issues.apache.org/jira/browse/FLINK-17706
> Project: Flink
>  Issue Type: Sub-task
>  Components: Benchmarks
>Affects Versions: 1.11.0
>Reporter: Nico Kruber
>Priority: Major
> Fix For: 1.11.0
>
>
> After enabling the rat plugin, it finds the following files with missing or 
> invalid license headers, broken down by all contributors I could find in the 
> git history. If I see this correctly, every contributor should acknowledge 
> the change of their files to the Apache License and then we could add the 
> license headers and continue the project move:
>  * [~rgrebennikov] + [~NicoK]
> {code:java}
>   
> src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
>   
> src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
>  {code}
>  * [~sunhaibotb]
> {code:java}
>   src/main/java/org/apache/flink/benchmark/functions/QueuingLongSource.java
>   
> src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoCoStreamMap.java{code}
>  * [~pnowojski]
> {code:java}
>   src/main/java/org/apache/flink/benchmark/functions/IntLongApplications.java
>   src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java
>   src/main/java/org/apache/flink/benchmark/functions/LongSource.java
>   src/main/java/org/apache/flink/benchmark/functions/MultiplyByTwo.java
>   src/main/java/org/apache/flink/benchmark/functions/MultiplyIntLongByTwo.java
>   src/main/java/org/apache/flink/benchmark/functions/SuccessException.java
>   src/main/java/org/apache/flink/benchmark/functions/SumReduce.java
>   src/main/java/org/apache/flink/benchmark/functions/SumReduceIntLong.java
>   src/main/java/org/apache/flink/benchmark/functions/ValidatingCounter.java
>   src/main/java/org/apache/flink/benchmark/CollectSink.java{code}
>  * [~pnowojski]  + [~sunhaibotb]  + [~xintongsong]
> {code:java}
>src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java{code}
>  * [~NicoK]
> {code:java}
>   src/main/resources/avro/mypojo.avsc
>   src/main/resources/protobuf/MyPojo.proto
>   src/main/resources/thrift/mypojo.thrift{code}
>  * [~pnowojski] + [~liyu]
> {code:java}
>   save_jmh_result.py{code}
> The license should be clarified with the author and all contributors of that 
> file.
> Please, every tagged contributor, express your decision (whether you are fine 
> with the change) below, so we can continue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17714) Support custom RestartBackoffTimeStrategy

2020-05-14 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17714:
---

 Summary: Support custom RestartBackoffTimeStrategy
 Key: FLINK-17714
 URL: https://issues.apache.org/jira/browse/FLINK-17714
 Project: Flink
  Issue Type: Wish
  Components: Runtime / Coordination
Reporter: Zhu Zhu


There are cases that users need to customize RestartBackoffTimeStrategy to 
better control job recovery. 
One example is that users want a job to restart only on certain errors and fail 
on others. See this ML 
[disscusion|https://lists.apache.org/thread.html/rde685552a83d0d146cf83560df1bc6f33d3dd569f69ae7bbcc4ae508%40%3Cuser.flink.apache.org%3E].





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #12161: [FLINK-17450][sql-parser][hive] Implement function & catalog DDLs for Hive dialect

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12161:
URL: https://github.com/apache/flink/pull/12161#issuecomment-629011624


   
   ## CI report:
   
   * fb208fbfdf55bdde668c609a3305d938b8105f95 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1363)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12157: [FLINK-17691][Connectors/Kafka] fix the bug FlinkKafkaProducer011 transactional.id too long when use Semantic.EXACTLY_ONCE mode

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12157:
URL: https://github.com/apache/flink/pull/12157#issuecomment-628732224


   
   ## CI report:
   
   * c72006072a887727d7ac814ef85576ae3c9a6bd7 UNKNOWN
   * 409246b65de6c5e76e6462ed4a55d73c7dc76c22 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/166181069) Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1366)
 
   * 07dcda24cfb9baa75b14b5ec953eabb9603e3082 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12112: [FLINK-16075][docs-zh] Translate "The Broadcast State Pattern" page into Chinese

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12112:
URL: https://github.com/apache/flink/pull/12112#issuecomment-627844596


   
   ## CI report:
   
   * fabb16cd67f08a76f43a0c3b62d2b48e7b6a6642 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1368)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12150: [FLINK-17026][kafka] Introduce a new Kafka connect or with new proper…

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12150:
URL: https://github.com/apache/flink/pull/12150#issuecomment-628604536


   
   ## CI report:
   
   * 5ecc1a2c3a42b5b10f837d5c7a50f76dc31d6a97 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1370)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12143: [FLINK-17632][yarn] Support to specify a remote path for job jar

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12143:
URL: https://github.com/apache/flink/pull/12143#issuecomment-628420573


   
   ## CI report:
   
   * dcd8b8efec513fcfa4d833ab2915e7171eee6ace Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1324)
 
   * 0815d70382a6f6630beca0aff985e8dfdf480d32 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1360)
 
   * cbaf6e2779ce08f2e38fd84522fa1a6843468e83 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12108: [FLINK-17448][sql-parser][table-api-java][table-planner-blink][hive] Implement table DDLs for Hive dialect part2

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12108:
URL: https://github.com/apache/flink/pull/12108#issuecomment-627804642


   
   ## CI report:
   
   * b16d233a709d1d9794b8ff261dfa598a9d7530d5 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1250)
 
   * 66bbeb2e19f810744287740e7a691da17ebb8b0c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12069: [FLINK-14807][streaming] Add specialized collecting sink function

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12069:
URL: https://github.com/apache/flink/pull/12069#issuecomment-626515528


   
   ## CI report:
   
   * 731dff0812f1abb16193f7a2fe6f54b3ef65ae2d UNKNOWN
   * b90556c18fabe5fcfc3e93ba8c6b6a44b0d21f88 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1330)
 
   * de5d91a75454c9db885b3a2b81acfca9c0f5f34e Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1357)
 
   * e90ac74644df86278815a6a266b48ba7aeefbd7c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

2020-05-14 Thread GitBox


zhijiangW commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425580357



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED;
+
+class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler {
+   private final CheckpointBarrierAligner alignedHandler;
+   private final CheckpointBarrierUnaligner unalignedHandler;
+   private CheckpointBarrierHandler activeHandler;
+
+   AlternatingCheckpointBarrierHandler(CheckpointBarrierAligner 
alignedHandler, CheckpointBarrierUnaligner unalignedHandler, AbstractInvokable 
invokable) {
+   super(invokable);
+   this.activeHandler = this.alignedHandler = alignedHandler;
+   this.unalignedHandler = unalignedHandler;
+   }
+
+   @Override
+   public void releaseBlocksAndResetBarriers() {
+   activeHandler.releaseBlocksAndResetBarriers();
+   }
+
+   @Override
+   public boolean isBlocked(int channelIndex) {
+   return activeHandler.isBlocked(channelIndex);
+   }
+
+   @Override
+   public void processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex) throws Exception {
+   CheckpointBarrierHandler previousHandler = activeHandler;
+   activeHandler = receivedBarrier.isCheckpoint() ? 
unalignedHandler : alignedHandler;
+   abortPreviousIfNeeded(receivedBarrier, previousHandler);
+   activeHandler.processBarrier(receivedBarrier, channelIndex);
+   }
+
+   private void abortPreviousIfNeeded(CheckpointBarrier barrier, 
CheckpointBarrierHandler prevHandler) throws IOException {
+   if (prevHandler != activeHandler && 
prevHandler.isCheckpointPending() && prevHandler.getLatestCheckpointId() < 
barrier.getId()) {
+   prevHandler.releaseBlocksAndResetBarriers();
+   notifyAbort(
+   prevHandler.getLatestCheckpointId(),
+   new CheckpointException(
+   format("checkpoint %d subsumed by %d", 
prevHandler.getLatestCheckpointId(), barrier.getId()),
+   CHECKPOINT_DECLINED_SUBSUMED));
+   }
+   }
+
+   @Override
+   public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
+   activeHandler.processCancellationBarrier(cancelBarrier);
+   }
+
+   @Override
+   public void processEndOfPartition() throws Exception {
+   alignedHandler.processEndOfPartition();

Review comment:
   A bit confuse here why we need call both handlers, should only the 
active handler work at the same time?

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425580321



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##
@@ -157,6 +158,13 @@
 
GlobalAggregateManager getGlobalAggregateManager();
 
+   /**
+* Get the enabled external resource drivers for external resources.
+*
+* @return the enabled external resource drivers for external resources
+*/
+   Map getExternalResourceDrivers();

Review comment:
   I think `Map getExternalResourceInfos` 
would be good enough for current version.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #11010:
URL: https://github.com/apache/flink/pull/11010#issuecomment-581769412


   
   ## CI report:
   
   * 27818c93f9a8d1f580b96bca10111f0cb95bbb07 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1364)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu commented on pull request #12092: [FLINK-17596][python] Move the Python UDF runner script out of the jar of flink-python

2020-05-14 Thread GitBox


dianfu commented on pull request #12092:
URL: https://github.com/apache/flink/pull/12092#issuecomment-629041252


   Thanks. Merging...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-17606) Introduce DataGen connector in table

2020-05-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-17606.

Resolution: Implemented

master: 91557c83447444ecfbeba3e4c58297908940834a

> Introduce DataGen connector in table
> 
>
> Key: FLINK-17606
> URL: https://issues.apache.org/jira/browse/FLINK-17606
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> {code:java}
> CREATE TABLE user (
>     id BIGINT,
>     age INT,
>     description STRING
> ) WITH (
>     'connector' = 'datagen',
>     'rows-per-second'='100',
>     'fields.id.kind' = 'sequence',
>     'fields.id.start' = '1',
>     'fields.age.kind' = 'random',
>     'fields.age.min' = '0',
>     'fields.age.max' = '100',
>     'fields.description.kind' = 'random',
>     'fields.description.length' = '100'
> )
> -- Default is random generator.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17606) Introduce DataGen connector in table

2020-05-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-17606:
-
Component/s: Table SQL / API

> Introduce DataGen connector in table
> 
>
> Key: FLINK-17606
> URL: https://issues.apache.org/jira/browse/FLINK-17606
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> {code:java}
> CREATE TABLE user (
>     id BIGINT,
>     age INT,
>     description STRING
> ) WITH (
>     'connector' = 'datagen',
>     'rows-per-second'='100',
>     'fields.id.kind' = 'sequence',
>     'fields.id.start' = '1',
>     'fields.age.kind' = 'random',
>     'fields.age.min' = '0',
>     'fields.age.max' = '100',
>     'fields.description.kind' = 'random',
>     'fields.description.length' = '100'
> )
> -- Default is random generator.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi merged pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

2020-05-14 Thread GitBox


JingsongLi merged pull request #12074:
URL: https://github.com/apache/flink/pull/12074


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] WeiZhong94 commented on pull request #12092: [FLINK-17596][python] Move the Python UDF runner script out of the jar of flink-python

2020-05-14 Thread GitBox


WeiZhong94 commented on pull request #12092:
URL: https://github.com/apache/flink/pull/12092#issuecomment-629040680


   @dianfu The test has passed in my azure. 
https://dev.azure.com/weizhong0618/Flink/_build/results?buildId=101=results



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-17713) "Stateful stream job upgrade end-to-end test" fails

2020-05-14 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-17713.
--
Resolution: Fixed

> "Stateful stream job upgrade end-to-end test" fails
> ---
>
> Key: FLINK-17713
> URL: https://issues.apache.org/jira/browse/FLINK-17713
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Robert Metzger
>Priority: Blocker
>
> CI: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1348=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5
> Potentially caused by this:
> {code}
> 2020-05-15T04:46:20.7037837Z 2020-05-15 04:46:11,134 WARN  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - 
> Fail to subsume the old checkpoint.
> 2020-05-15T04:46:20.7038858Z java.io.IOException: Directory 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-56851042201/savepoint-e2e-test-chckpt-dir/33e34191d2f3c84d9b7eb5898d3a34fc/chk-3
>  is not empty
> 2020-05-15T04:46:20.7039955Z  at 
> org.apache.flink.core.fs.local.LocalFileSystem.delete(LocalFileSystem.java:192)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7041042Z  at 
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7042317Z  at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7043438Z  at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7044540Z  at 
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7045684Z  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1003)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7047005Z  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:910)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7048039Z  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7048663Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_252]
> 2020-05-15T04:46:20.7049114Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_252]
> 2020-05-15T04:46:20.7049643Z  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [?:1.8.0_252]
> 2020-05-15T04:46:20.7050291Z  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  [?:1.8.0_252]
> 2020-05-15T04:46:20.7057390Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_252]
> 2020-05-15T04:46:20.7058064Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_252]
> 2020-05-15T04:46:20.7058510Z  at java.lang.Thread.run(Thread.java:748) 
> [?:1.8.0_252]
> 2
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (FLINK-17467) Implement aligned savepoint in UC mode

2020-05-14 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-17467:


Reopened because of FLINK-17713

> Implement aligned savepoint in UC mode
> --
>
> Key: FLINK-17467
> URL: https://issues.apache.org/jira/browse/FLINK-17467
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Task
>Affects Versions: 1.11.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425577548



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##
@@ -157,6 +158,13 @@
 
GlobalAggregateManager getGlobalAggregateManager();
 
+   /**
+* Get the enabled external resource drivers for external resources.
+*
+* @return the enabled external resource drivers for external resources
+*/
+   Map getExternalResourceDrivers();

Review comment:
   I think it would be good enough to expose `Set 
getExternalResourceInfo(String resourceName)`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17713) "Stateful stream job upgrade end-to-end test" fails

2020-05-14 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107967#comment-17107967
 ] 

Robert Metzger commented on FLINK-17713:


Reverted in 
https://github.com/apache/flink/commit/3af17562eb791e3f486c38dbd94dc3328309b262

> "Stateful stream job upgrade end-to-end test" fails
> ---
>
> Key: FLINK-17713
> URL: https://issues.apache.org/jira/browse/FLINK-17713
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Robert Metzger
>Priority: Blocker
>
> CI: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1348=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5
> Potentially caused by this:
> {code}
> 2020-05-15T04:46:20.7037837Z 2020-05-15 04:46:11,134 WARN  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - 
> Fail to subsume the old checkpoint.
> 2020-05-15T04:46:20.7038858Z java.io.IOException: Directory 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-56851042201/savepoint-e2e-test-chckpt-dir/33e34191d2f3c84d9b7eb5898d3a34fc/chk-3
>  is not empty
> 2020-05-15T04:46:20.7039955Z  at 
> org.apache.flink.core.fs.local.LocalFileSystem.delete(LocalFileSystem.java:192)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7041042Z  at 
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7042317Z  at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7043438Z  at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7044540Z  at 
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7045684Z  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1003)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7047005Z  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:910)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7048039Z  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 2020-05-15T04:46:20.7048663Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_252]
> 2020-05-15T04:46:20.7049114Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_252]
> 2020-05-15T04:46:20.7049643Z  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [?:1.8.0_252]
> 2020-05-15T04:46:20.7050291Z  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  [?:1.8.0_252]
> 2020-05-15T04:46:20.7057390Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_252]
> 2020-05-15T04:46:20.7058064Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_252]
> 2020-05-15T04:46:20.7058510Z  at java.lang.Thread.run(Thread.java:748) 
> [?:1.8.0_252]
> 2
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17565) Bump fabric8 version from 4.5.2 to 4.9.1

2020-05-14 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107965#comment-17107965
 ] 

Yang Wang commented on FLINK-17565:
---

Maybe we need to wait for the release of 4.9.2 and then bump the version. Since 
we find that kubernetes-client could not work with jdk 8u252[1]. It will be 
resolved in 4.9.2, 4.10.2.

I will also keep an eye on the release.

 

[1]. [https://github.com/fabric8io/kubernetes-client/issues/2212]

> Bump fabric8 version from 4.5.2 to 4.9.1
> 
>
> Key: FLINK-17565
> URL: https://issues.apache.org/jira/browse/FLINK-17565
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently, we are using a version of 4.5.2, it's better that we upgrade it to 
> 4.9.1, some of the reasons are as follows:
> # It removed the use of reapers manually doing cascade deletion of resources, 
> leave it up to Kubernetes APIServer, which solves the issue of FLINK-17566, 
> more info:  https://github.com/fabric8io/kubernetes-client/issues/1880
> # It introduced a regression in building Quantity values in 4.7.0, release 
> note https://github.com/fabric8io/kubernetes-client/issues/1953.
> # It provided better support for K8s 1.17, release note: 
> https://github.com/fabric8io/kubernetes-client/releases/tag/v4.7.0.
> For more release notes, please refer to [fabric8 
> releases|https://github.com/fabric8io/kubernetes-client/releases].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17713) "Stateful stream job upgrade end-to-end test" fails

2020-05-14 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-17713:
--

 Summary: "Stateful stream job upgrade end-to-end test" fails
 Key: FLINK-17713
 URL: https://issues.apache.org/jira/browse/FLINK-17713
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Robert Metzger


CI: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1348=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5

Potentially caused by this:
{code}
2020-05-15T04:46:20.7037837Z 2020-05-15 04:46:11,134 WARN  
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - 
Fail to subsume the old checkpoint.
2020-05-15T04:46:20.7038858Z java.io.IOException: Directory 
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-56851042201/savepoint-e2e-test-chckpt-dir/33e34191d2f3c84d9b7eb5898d3a34fc/chk-3
 is not empty
2020-05-15T04:46:20.7039955Zat 
org.apache.flink.core.fs.local.LocalFileSystem.delete(LocalFileSystem.java:192) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
2020-05-15T04:46:20.7041042Zat 
org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
2020-05-15T04:46:20.7042317Zat 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
2020-05-15T04:46:20.7043438Zat 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
2020-05-15T04:46:20.7044540Zat 
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
2020-05-15T04:46:20.7045684Zat 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1003)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
2020-05-15T04:46:20.7047005Zat 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:910)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
2020-05-15T04:46:20.7048039Zat 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
2020-05-15T04:46:20.7048663Zat 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_252]
2020-05-15T04:46:20.7049114Zat 
java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_252]
2020-05-15T04:46:20.7049643Zat 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_252]
2020-05-15T04:46:20.7050291Zat 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_252]
2020-05-15T04:46:20.7057390Zat 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_252]
2020-05-15T04:46:20.7058064Zat 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_252]
2020-05-15T04:46:20.7058510Zat java.lang.Thread.run(Thread.java:748) 
[?:1.8.0_252]
2
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425575563



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
##
@@ -183,8 +186,9 @@ protected void closeLocalStrategiesAndCaches() {
@Override
public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup 
metrics) {
Environment env = getEnvironment();
+
return new IterativeRuntimeUdfContext(env.getTaskInfo(), 
getUserCodeClassLoader(),
-   getExecutionConfig(), 
env.getDistributedCacheEntries(), this.accumulatorMap, metrics);
+   getExecutionConfig(), 
env.getDistributedCacheEntries(), this.accumulatorMap, metrics, 
ExternalResourceUtils.getExternalResourceInfo(env.getExternalResourceDrivers(), 
env.getTaskManagerInfo().getConfiguration()));

Review comment:
   I think it would not be expensive since we only retrieve the 
`ExternalResourceInfos` at the first time. After that, it will use the cache in 
`SharedExternalResources`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] TsReaper commented on pull request #12069: [FLINK-14807][streaming] Add specialized collecting sink function

2020-05-14 Thread GitBox


TsReaper commented on pull request #12069:
URL: https://github.com/apache/flink/pull/12069#issuecomment-629037091


   > CollectSinkOperatorCoordinator
   
   `CollectSinkOperatorCoordinator` is also tested along with the sink function.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17416) Flink-kubernetes doesn't work on java 8 8u252

2020-05-14 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107960#comment-17107960
 ] 

Yang Wang commented on FLINK-17416:
---

Update on this issue.

 

After some discussion with author of the fabric8 kubernetes-client[1], we have 
get a conclusion that kubernetes-client could not work with jdk 8u252 because 
of the okhttp bug. Currently, we could use "export HTTP2_DISABLE=true" to 
disable the http2.

Once the fabric8 kubernetes new version(4.9.2, 4.10.2) released, we need to 
bump the version and get a clean solution.

 

[1]. [https://github.com/fabric8io/kubernetes-client/issues/2212]

> Flink-kubernetes doesn't work on java 8 8u252
> -
>
> Key: FLINK-17416
> URL: https://issues.apache.org/jira/browse/FLINK-17416
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0, 1.11.0
>Reporter: wangxiyuan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.11.0
>
> Attachments: log.k8s.session.8u252
>
>
> When using java-8-8u252 version, the flink container end-to-end failed. The 
> test  `Running 'Run kubernetes session test'` fails with the `Broken pipe` 
> error.
> See:
> [https://logs.openlabtesting.org/logs/periodic-20-flink-mail/github.com/apache/flink/master/flink-end-to-end-test-arm64-container/fcfdd47/job-output.txt.gz]
>  
> Flink Azure CI doesn't hit this problem because it runs under jdk-8-8u242
>  
> The reason is that the okhttp library which flink using doesn't work on 
> java-8-8u252:
> [https://github.com/square/okhttp/issues/5970]
>  
> The problem has been with the PR:
> [https://github.com/square/okhttp/pull/5977]
>  
> Maybe we can wait for a new 3.12.x release and bump the okhttp version in 
> Flink later.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] rmetzger commented on pull request #12160: [FLINK-17710][python][hotfix] StreamSqlTests.test_execute_sql test is…

2020-05-14 Thread GitBox


rmetzger commented on pull request #12160:
URL: https://github.com/apache/flink/pull/12160#issuecomment-629036740


   I don't know what happened there. I manually triggered a build for commit 
"be9779df4a817697788ed44fc618900595b248d8" here: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1372=results



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17710) StreamSqlTests.test_execute_sql test is not stable

2020-05-14 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107958#comment-17107958
 ] 

Hequn Cheng commented on FLINK-17710:
-

Changs were reverted for now on master since the test begins abnormally. See 
more details 
[here|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1354=results].
 Reverted via 7982281a190415f86d589f17b3c2ce4474d2e42d

> StreamSqlTests.test_execute_sql test is not stable
> --
>
> Key: FLINK-17710
> URL: https://issues.apache.org/jira/browse/FLINK-17710
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Hequn Cheng
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>
> Failure log:
> https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/1311/logs/144



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KurtYoung commented on pull request #12069: [FLINK-14807][streaming] Add specialized collecting sink function

2020-05-14 Thread GitBox


KurtYoung commented on pull request #12069:
URL: https://github.com/apache/flink/pull/12069#issuecomment-629035731


   Still need a multi thread tests for `CollectSinkFunction` to test the case 
when buffer is full and block the main thread.
   And also need add some tests for `CollectSinkOperatorCoordinator`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17303) Return TableResult for Python TableEnvironment

2020-05-14 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107957#comment-17107957
 ] 

Hequn Cheng commented on FLINK-17303:
-

Changes were reverted in master due to FLINK-17710 via 
70346b2c7ee8e449cc597ab4f19bc02cf6117f86

> Return TableResult for Python TableEnvironment
> --
>
> Key: FLINK-17303
> URL: https://issues.apache.org/jira/browse/FLINK-17303
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: godfrey he
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> [FLINK-16366|https://issues.apache.org/jira/browse/FLINK-16366] supports 
> executing a statement and returning a {{TableResult}} object, which could get 
> {{JobClient}} (to associates the submitted Flink job), collect the execution 
> result, or print the execution result. In Python, we should also introduce 
> python TableResult class to make sure consistent with Java.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425572456



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
##
@@ -35,9 +35,9 @@
 
private final SinkTransformation transformation;
 
-   @SuppressWarnings("unchecked")
protected DataStreamSink(DataStream inputStream, StreamSink 
operator) {
-   this.transformation = new 
SinkTransformation(inputStream.getTransformation(), "Unnamed", operator, 
inputStream.getExecutionEnvironment().getParallelism());

Review comment:
   I will revert it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425571792



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A customized {@link StreamOperator} for executing {@link 
FlinkKafkaShuffleProducer} that handle
+ * both elements and watermarks. If the shuffle sink is determined to be 
useful to other sinks in the future,
+ * we should abstract this operator to data stream api. For now, we keep the 
operator this way to avoid
+ * public interface change.
+ */
+@Internal
+class StreamKafkaShuffleSink extends StreamSink {
+
+   public StreamKafkaShuffleSink(FlinkKafkaShuffleProducer 
flinkKafkaShuffleProducer) {
+   super(flinkKafkaShuffleProducer);
+   }
+
+   @Override
+   public void processWatermark(Watermark mark) throws Exception {
+   super.processWatermark(mark);
+   this.currentWatermark = mark.getTimestamp();

Review comment:
   That's a good catch! Thanks!!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] hequn8128 edited a comment on pull request #12160: [FLINK-17710][python][hotfix] StreamSqlTests.test_execute_sql test is…

2020-05-14 Thread GitBox


hequn8128 edited a comment on pull request #12160:
URL: https://github.com/apache/flink/pull/12160#issuecomment-629031753


   I have reverted the related commits for safety consideration. The test 
begins normally now. I will check more later. Sorry about this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-web] chaojianok commented on pull request #247: [FLINK-13683] Translate "Code Style - Component Guide" page into Chinese

2020-05-14 Thread GitBox


chaojianok commented on pull request #247:
URL: https://github.com/apache/flink-web/pull/247#issuecomment-629033247


   @klion26 Thanks a lot for your review, I've optimized it according to your 
suggestions, please help me review again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai opened a new pull request #111: [FLINK-17712] [build] Upgrade Flink version to 1.10.1

2020-05-14 Thread GitBox


tzulitai opened a new pull request #111:
URL: https://github.com/apache/flink-statefun/pull/111


   Note that this would not pass as of now, since the Docker image 
`flink:1.10.1` probably isn't published yet. We should merge this only after 
that is available.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-17712) Upgrade Flink version to 1.10.1 in Stateful Functions

2020-05-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-17712:
---
Labels: pull-request-available  (was: )

> Upgrade Flink version to 1.10.1 in Stateful Functions
> -
>
> Key: FLINK-17712
> URL: https://issues.apache.org/jira/browse/FLINK-17712
> Project: Flink
>  Issue Type: Task
>  Components: Stateful Functions
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: statefun-2.0.1, statefun-2.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #12132: [FLINK-17593][Connectors/FileSystem] Support arbitrary recovery mechanism for PartFileWriter

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12132:
URL: https://github.com/apache/flink/pull/12132#issuecomment-628151415


   
   ## CI report:
   
   * 09465d2623a839311aa67ddfac4059f525f2b62c Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1346)
 
   * 2438c1edcf3487925d58163ad271834b2fd1bf43 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1369)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12150: [FLINK-17026][kafka] Introduce a new Kafka connect or with new proper…

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12150:
URL: https://github.com/apache/flink/pull/12150#issuecomment-628604536


   
   ## CI report:
   
   * e0e57e5b94fe42e04146af4796431cdc40efc2bb Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1293)
 
   * 5ecc1a2c3a42b5b10f837d5c7a50f76dc31d6a97 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1370)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12112: [FLINK-16075][docs-zh] Translate "The Broadcast State Pattern" page into Chinese

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12112:
URL: https://github.com/apache/flink/pull/12112#issuecomment-627844596


   
   ## CI report:
   
   * caebbb2978136a5c36059cef6dcfde1b97f26456 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1156)
 
   * fabb16cd67f08a76f43a0c3b62d2b48e7b6a6642 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1368)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11385: [FLINK-11395][formats] Support for Avro StreamingFileSink

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #11385:
URL: https://github.com/apache/flink/pull/11385#issuecomment-597956562


   
   ## CI report:
   
   * 9e1bbeafa1a4d0f673ebba7f87ef9032253a29f0 UNKNOWN
   * 88eb4f91c72b735fe947ea5beb4ab430138a6077 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1361)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] hequn8128 commented on pull request #12160: [FLINK-17710][python][hotfix] StreamSqlTests.test_execute_sql test is…

2020-05-14 Thread GitBox


hequn8128 commented on pull request #12160:
URL: https://github.com/apache/flink/pull/12160#issuecomment-629031753


   Still not sure about the reasons, I have reverted the related commits for 
safety consideration. The test begins normally now. I will check more later. 
Sorry about this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425570222



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair 
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+   static final String PRODUCER_PARALLELISM = "producer parallelism";
+   static final String PARTITION_NUMBER = "partition number";
+
+   /**
+* Write to and read from a kafka shuffle with the partition decided by 
keys.
+* Consumers should read partitions equal to the key group indices they 
are assigned.
+* The number of partitions is the maximum parallelism of the receiving 
operator.
+* This version only supports numberOfPartitions = consumerParallelism.

Review comment:
   Do you mean specifically saying "producerParallelism != 
numberOfPartitions?"
   
   If think this is keyBy through a persistent channel, the producer 
parallelism does not matter with the max key group size? I kind of feeling 
confusing if adding such a comment.
   
   For config changes, yes, probably in the next step; Overall, this is only a 
very first version, allowing users to set parallelism when writing pipelines 
might be enough. 
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-17712) Upgrade Flink version to 1.10.1 in Stateful Functions

2020-05-14 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-17712:
---

 Summary: Upgrade Flink version to 1.10.1 in Stateful Functions
 Key: FLINK-17712
 URL: https://issues.apache.org/jira/browse/FLINK-17712
 Project: Flink
  Issue Type: Task
  Components: Stateful Functions
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: statefun-2.0.1, statefun-2.1.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-statefun] tzulitai commented on a change in pull request #110: [FLINK-17611] Unix domain socket connection between worker and function

2020-05-14 Thread GitBox


tzulitai commented on a change in pull request #110:
URL: https://github.com/apache/flink-statefun/pull/110#discussion_r425563507



##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
##
@@ -41,12 +45,32 @@ public RequestReplyFunction functionOfType(FunctionType 
type) {
 if (spec == null) {
   throw new IllegalArgumentException("Unsupported type " + type);
 }
-// specific client reuses the same the connection pool and thread pool
-// as the sharedClient.
-OkHttpClient specificClient =
-
sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
-RequestReplyClient httpClient =
-new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), 
specificClient);
-return new RequestReplyFunction(spec.states(), spec.maxNumBatchRequests(), 
httpClient);
+return new RequestReplyFunction(
+spec.states(), spec.maxNumBatchRequests(), buildHttpClient(spec));
+  }
+
+  private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+// We need to build a UDS HTTP client
+if (spec.unixDomainSocket() != null) {
+  OkHttpClient specificClient =
+  sharedClient
+  .newBuilder()
+  .socketFactory(new 
AFUNIXSocketFactory.FactoryArg(spec.unixDomainSocket()))

Review comment:
   Is there a reason that you chose not to implement a custom UDS socket 
factory like in the example at 
https://github.com/square/okhttp/tree/master/samples/unixdomainsockets/src/main/java/okhttp3/unixdomainsockets?
   
   I'm asking only because we should try to have the `statefun-flink-core` 
dependencies as slim as possible, as it directly influences the size of our 
base distributions. It might also not be too big of a deal if the 
`junixsocket-core` library is small.

##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
##
@@ -41,12 +45,32 @@ public RequestReplyFunction functionOfType(FunctionType 
type) {
 if (spec == null) {
   throw new IllegalArgumentException("Unsupported type " + type);
 }
-// specific client reuses the same the connection pool and thread pool
-// as the sharedClient.
-OkHttpClient specificClient =
-
sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
-RequestReplyClient httpClient =
-new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), 
specificClient);
-return new RequestReplyFunction(spec.states(), spec.maxNumBatchRequests(), 
httpClient);
+return new RequestReplyFunction(
+spec.states(), spec.maxNumBatchRequests(), buildHttpClient(spec));
+  }
+
+  private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+// We need to build a UDS HTTP client
+if (spec.unixDomainSocket() != null) {
+  OkHttpClient specificClient =
+  sharedClient
+  .newBuilder()
+  .socketFactory(new 
AFUNIXSocketFactory.FactoryArg(spec.unixDomainSocket()))
+  // Enable HTTP/2 if available (uses H2 upgrade),
+  // otherwise fallback to HTTP/1.1
+  .protocols(Collections.singletonList(Protocol.HTTP_2))
+  .callTimeout(spec.maxRequestDuration())
+  .build();
+
+  return new HttpRequestReplyClient(
+  // Only the path matters!
+  HttpUrl.get(URI.create(spec.endpoint().getPath())), specificClient);

Review comment:
   For easier readability: maybe it makese sense to add to the spec class a 
getter method which just returns the path string without the scheme.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12150: [FLINK-17026][kafka] Introduce a new Kafka connect or with new proper…

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12150:
URL: https://github.com/apache/flink/pull/12150#issuecomment-628604536


   
   ## CI report:
   
   * e0e57e5b94fe42e04146af4796431cdc40efc2bb Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1293)
 
   * 5ecc1a2c3a42b5b10f837d5c7a50f76dc31d6a97 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12112: [FLINK-16075][docs-zh] Translate "The Broadcast State Pattern" page into Chinese

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12112:
URL: https://github.com/apache/flink/pull/12112#issuecomment-627844596


   
   ## CI report:
   
   * caebbb2978136a5c36059cef6dcfde1b97f26456 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1156)
 
   * fabb16cd67f08a76f43a0c3b62d2b48e7b6a6642 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12157: [FLINK-17691][Connectors/Kafka] fix the bug FlinkKafkaProducer011 transactional.id too long when use Semantic.EXACTLY_ONCE mode

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12157:
URL: https://github.com/apache/flink/pull/12157#issuecomment-628732224


   
   ## CI report:
   
   * c72006072a887727d7ac814ef85576ae3c9a6bd7 UNKNOWN
   * 409246b65de6c5e76e6462ed4a55d73c7dc76c22 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/166181069) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1366)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12132: [FLINK-17593][Connectors/FileSystem] Support arbitrary recovery mechanism for PartFileWriter

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12132:
URL: https://github.com/apache/flink/pull/12132#issuecomment-628151415


   
   ## CI report:
   
   * 09465d2623a839311aa67ddfac4059f525f2b62c Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1346)
 
   * 2438c1edcf3487925d58163ad271834b2fd1bf43 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12025: [FLINK-17435] [hive] Hive non-partitioned source supports streaming read

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12025:
URL: https://github.com/apache/flink/pull/12025#issuecomment-625292121


   
   ## CI report:
   
   * 04db04d8d1b41ca05618c5b09b83b3031bf7ee11 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1313)
 
   * d4b70dbd62d22edbaf0c3ff54ca61a35ac93c04e Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1356)
 
   * c38c037f31687b4d20ef0e183d42cc5f64d3bb45 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1365)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-15648) Support to configure limit for CPU requirement

2020-05-14 Thread Canbin Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Canbin Zheng updated FLINK-15648:
-
Parent: FLINK-17709
Issue Type: Sub-task  (was: New Feature)

> Support to configure limit for CPU requirement
> --
>
> Key: FLINK-15648
> URL: https://issues.apache.org/jira/browse/FLINK-15648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Assignee: Canbin Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The current branch use kubernetes.xx.cpu to configure request and limit 
> resource requirement for a Container, it's an important improvement to 
> separate these two configurations, we can use kubernetes.xx.request.cpu and 
> kubernetes.xx.limit.cpu to specify request and limit resource 
> requirements.{color:#6a8759}
> {color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15648) Support to configure limit for CPU requirement

2020-05-14 Thread Canbin Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Canbin Zheng updated FLINK-15648:
-
Parent: (was: FLINK-14460)
Issue Type: New Feature  (was: Sub-task)

> Support to configure limit for CPU requirement
> --
>
> Key: FLINK-15648
> URL: https://issues.apache.org/jira/browse/FLINK-15648
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Assignee: Canbin Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The current branch use kubernetes.xx.cpu to configure request and limit 
> resource requirement for a Container, it's an important improvement to 
> separate these two configurations, we can use kubernetes.xx.request.cpu and 
> kubernetes.xx.limit.cpu to specify request and limit resource 
> requirements.{color:#6a8759}
> {color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #11010:
URL: https://github.com/apache/flink/pull/11010#issuecomment-581769412


   
   ## CI report:
   
   * 1130280dfe557a0268fed248089fb32d15f832d9 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160664763) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7620)
 
   * 27818c93f9a8d1f580b96bca10111f0cb95bbb07 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1364)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17706) Clarify licensing situation

2020-05-14 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107940#comment-17107940
 ] 

Yu Li commented on FLINK-17706:
---

I'm ok with licensing my contributions to the Apache License 2.0

Thanks for driving this [~NicoK]!

> Clarify licensing situation
> ---
>
> Key: FLINK-17706
> URL: https://issues.apache.org/jira/browse/FLINK-17706
> Project: Flink
>  Issue Type: Sub-task
>  Components: Benchmarks
>Affects Versions: 1.11.0
>Reporter: Nico Kruber
>Priority: Major
> Fix For: 1.11.0
>
>
> After enabling the rat plugin, it finds the following files with missing or 
> invalid license headers, broken down by all contributors I could find in the 
> git history. If I see this correctly, every contributor should acknowledge 
> the change of their files to the Apache License and then we could add the 
> license headers and continue the project move:
>  * [~rgrebennikov] + [~NicoK]
> {code:java}
>   
> src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
>   
> src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
>  {code}
>  * [~sunhaibotb]
> {code:java}
>   src/main/java/org/apache/flink/benchmark/functions/QueuingLongSource.java
>   
> src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoCoStreamMap.java{code}
>  * [~pnowojski]
> {code:java}
>   src/main/java/org/apache/flink/benchmark/functions/IntLongApplications.java
>   src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java
>   src/main/java/org/apache/flink/benchmark/functions/LongSource.java
>   src/main/java/org/apache/flink/benchmark/functions/MultiplyByTwo.java
>   src/main/java/org/apache/flink/benchmark/functions/MultiplyIntLongByTwo.java
>   src/main/java/org/apache/flink/benchmark/functions/SuccessException.java
>   src/main/java/org/apache/flink/benchmark/functions/SumReduce.java
>   src/main/java/org/apache/flink/benchmark/functions/SumReduceIntLong.java
>   src/main/java/org/apache/flink/benchmark/functions/ValidatingCounter.java
>   src/main/java/org/apache/flink/benchmark/CollectSink.java{code}
>  * [~pnowojski]  + [~sunhaibotb]  + [~xintongsong]
> {code:java}
>src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java{code}
>  * [~NicoK]
> {code:java}
>   src/main/resources/avro/mypojo.avsc
>   src/main/resources/protobuf/MyPojo.proto
>   src/main/resources/thrift/mypojo.thrift{code}
>  * [~pnowojski] + [~liyu]
> {code:java}
>   save_jmh_result.py{code}
> The license should be clarified with the author and all contributors of that 
> file.
> Please, every tagged contributor, express your decision (whether you are fine 
> with the change) below, so we can continue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception

2020-05-14 Thread GitBox


wangyang0918 commented on a change in pull request #11010:
URL: https://github.com/apache/flink/pull/11010#discussion_r425563975



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##
@@ -91,6 +93,8 @@
/** The number of pods requested, but not yet granted. */
private int numPendingPodRequests = 0;
 
+   private KubernetesWatch podsWatch;

Review comment:
   Every open source project has its own lifecycle. I could see the 
official K8s java client is becoming better with fast evolving. Maybe we will 
replace the actual implementation in the future. It is just personal thought. 
Certainly you could have your own and we do not need to reach a consensus here.
   
   Whatever, introducing the wrapper class does not bring too much burden to 
our current implementation. So i will try to keep them. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425552834



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(0));

Review comment:
   If so, it should be `assertThat(externalResources.keySet(), 
is(empty()));`. For me, I prefer the current one. WDYT?





[jira] [Comment Edited] (FLINK-17611) Support unix domain sockets for sidecar communication in Stateful Functions

2020-05-14 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107933#comment-17107933
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-17611 at 5/15/20, 4:43 AM:
---

[~igal] concerning the YAML spec format for endpoints with UDS:

Does the {{unix://}} scheme convention also support endpoint paths?
Otherwise, I also see a {{http+unix://}} convention used by 
[requests-unixsocket|https://pypi.org/project/requests-unixsocket/] to specify 
both the socket file path and url when performing HTTP requests over UDS.

either way: I like that we use the scheme part of the endpoint URL to determine 
whether or not to talk via UDS, instead of an extra field in the YAML spec.
It seems like a known convention, and is more compact.


was (Author: tzulitai):
[~igal] concerning the YAML spec format:

Does the {{unix://}} scheme convention also support endpoint paths?
Otherwise, I also see a {{http+unix://}} convention used by 
[requests-unixsocket|https://pypi.org/project/requests-unixsocket/] to specify 
both the socket file path and url when performing HTTP requests over UDS.

either way: I like that we use the scheme part of the endpoint URL to determine 
whether or not to talk via UDS, instead of an extra field in the YAML spec.
It seems like a known convention, and is more compact.

> Support unix domain sockets for sidecar communication in Stateful Functions
> ---
>
> Key: FLINK-17611
> URL: https://issues.apache.org/jira/browse/FLINK-17611
> Project: Flink
>  Issue Type: New Feature
>  Components: Stateful Functions
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Hi all,
> I'm quite new to this project and I've started investigating its potential 
> usage in Kubernetes.
> I've found in past that using Unix Domain Sockets across several containers 
> in the same pod gives an interesting performance boost and drastically 
> reduces the overhead of going through the network stack. Given that 
> containers in a pod run in the same host, it's perfectly reasonable to let 
> them communicate through unix domain sockets.
> If you're interested in such feature, I'm more than willing to help 
> implementing that, given that I need a few pointers where to start from



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17611) Support unix domain sockets for sidecar communication in Stateful Functions

2020-05-14 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107933#comment-17107933
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-17611:
-

[~igal] concerning the YAML spec format:

Does the {{unix://}} scheme convention also support endpoint paths?
Otherwise, I also see a {{http+unix://}} convention used by 
[requests-unixsocket|https://pypi.org/project/requests-unixsocket/] to specify 
both the socket file path and url when performing HTTP requests over UDS.

either way: I like that we use the scheme part of the endpoint URL to determine 
whether or not to talk via UDS, instead of an extra field in the YAML spec.
It seems like a known convention, and is more compact.

> Support unix domain sockets for sidecar communication in Stateful Functions
> ---
>
> Key: FLINK-17611
> URL: https://issues.apache.org/jira/browse/FLINK-17611
> Project: Flink
>  Issue Type: New Feature
>  Components: Stateful Functions
>Reporter: Francesco Guardiani
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Hi all,
> I'm quite new to this project and I've started investigating its potential 
> usage in Kubernetes.
> I've found in past that using Unix Domain Sockets across several containers 
> in the same pod gives an interesting performance boost and drastically 
> reduces the overhead of going through the network stack. Given that 
> containers in a pod run in the same host, it's perfectly reasonable to let 
> them communicate through unix domain sockets.
> If you're interested in such feature, I'm more than willing to help 
> implementing that, given that I need a few pointers where to start from



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425562704



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+   static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+   /** Class used to set the extended resource. */
+   private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceSetResourceInformationMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceGetResourcesMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetNameMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetValueMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationNewInstanceMethod;
+
+   private final boolean isYarnResourceTypesAvailable;
+
+   private ResourceInformationReflector() {
+   this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+   }
+
+   @VisibleForTesting
+   ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+   Method resourceSetResourceInformationMethod = null;
+   Method resourceGetResourcesMethod = null;
+   Method resourceInformationGetNameMethod = null;
+   Method resourceInformationGetValueMethod = null;
+   Method resourceInformationNewInstanceMethod = null;
+   boolean isYarnResourceTypesAvailable = false;
+   try {
+   final Class resourceClass = 
Class.forName(resourceClassName);
+   final Class resourceInfoClass = 
Class.forName(resourceInfoClassName);
+   resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+   resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+   resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+   resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+   resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+   isYarnResourceTypesAvailable = true;
+   } catch (Exception e) {
+   LOG.debug("The underlying Yarn does not support 
external resource.", e);
+   } finally {
+   this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+   this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+   this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+   this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+   this.resourceInformationNewInstanceMethod = 

[jira] [Updated] (FLINK-15662) Make K8s client timeouts configurable

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15662:
--
Parent: FLINK-17709
Issue Type: Sub-task  (was: Improvement)

> Make K8s client timeouts configurable
> -
>
> Key: FLINK-15662
> URL: https://issues.apache.org/jira/browse/FLINK-15662
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Assignee: Canbin Zheng
>Priority: Major
>
> Kubernetes clients used in the client-side submission and requesting worker 
> pods should have configurable read and connect timeouts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15662) Make K8s client timeouts configurable

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15662:
--
Parent: (was: FLINK-14460)
Issue Type: Improvement  (was: Sub-task)

> Make K8s client timeouts configurable
> -
>
> Key: FLINK-15662
> URL: https://issues.apache.org/jira/browse/FLINK-15662
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Assignee: Canbin Zheng
>Priority: Major
>
> Kubernetes clients used in the client-side submission and requesting worker 
> pods should have configurable read and connect timeouts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425560889



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer extends FlinkKafkaConsumer {
+   private final TypeSerializer serializer;
+   private final int producerParallelism;
+
+   FlinkKafkaShuffleConsumer(String topic, 
TypeInformationSerializationSchema schema, Properties props) {

Review comment:
   That's not easy, because I have to call the super constructor of 
FlinkKafkaConsumer.
   
   Schema is needed for the constructor of FlinkKafkaConsumer, and it can not 
be null even if it is not needed (not sure)
   
   `super(topic, schema, props);`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425560889



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer extends FlinkKafkaConsumer {
+   private final TypeSerializer serializer;
+   private final int producerParallelism;
+
+   FlinkKafkaShuffleConsumer(String topic, 
TypeInformationSerializationSchema schema, Properties props) {

Review comment:
   That's not easy, because I have to call the super constructor of 
FlinkKafkaConsumer.
   
   Schema is needed for the constructor of FlinkKafkaConsumer
   
   super(topic, schema, props);





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-15871) Support to start sidecar container

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15871:
--
Fix Version/s: (was: 1.11.0)
   1.12.0

> Support to start sidecar container
> --
>
> Key: FLINK-15871
> URL: https://issues.apache.org/jira/browse/FLINK-15871
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Yang Wang
>Priority: Major
> Fix For: 1.12.0
>
>
> >> How does sidecar container work?
> A sidecar container is running beside the Jobmanager and TaskManager 
> container. It could be used to collect log or debug some problems. For 
> example, when we configure the sidecar container to fluentd and share the 
> TaskManager log with volume, then it could be used to upload the logs to 
> HDFS, elastic search, etc. Also we could start a sidecar container with 
> debugging image which contains lots of tools and help to debug the network 
> problems.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425560889



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer extends FlinkKafkaConsumer {
+   private final TypeSerializer serializer;
+   private final int producerParallelism;
+
+   FlinkKafkaShuffleConsumer(String topic, 
TypeInformationSerializationSchema schema, Properties props) {

Review comment:
   That's not easy, because I have to call the super constructor of 
FlinkKafkaConsumer.
   
   Schema is needed for the constructor of FlinkKafkaConsumer, and it can not 
be null even if it is not needed (not sure)
   
   super(topic, schema, props);





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-15871) Support to start sidecar container

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15871:
--
Parent: FLINK-17709
Issue Type: Sub-task  (was: New Feature)

> Support to start sidecar container
> --
>
> Key: FLINK-15871
> URL: https://issues.apache.org/jira/browse/FLINK-15871
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Yang Wang
>Priority: Major
> Fix For: 1.12.0
>
>
> >> How does sidecar container work?
> A sidecar container is running beside the Jobmanager and TaskManager 
> container. It could be used to collect log or debug some problems. For 
> example, when we configure the sidecar container to fluentd and share the 
> TaskManager log with volume, then it could be used to upload the logs to 
> HDFS, elastic search, etc. Also we could start a sidecar container with 
> debugging image which contains lots of tools and help to debug the network 
> problems.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15642) Support to set JobManager readiness and liveness check

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15642:
--
Parent: FLINK-17709
Issue Type: Sub-task  (was: New Feature)

> Support to set JobManager readiness and liveness check
> --
>
> Key: FLINK-15642
> URL: https://issues.apache.org/jira/browse/FLINK-15642
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Priority: Major
>
> The liveness of TaskManager will be controlled by Flink Master. When it 
> failed, timeout, a new pod will be started to replace. We need to add a 
> liveness check for JobManager.
>  
> It just like what we could do in the yaml.
> {code:java}
> ...
> livenessProbe:
>   tcpSocket:
> port: 6123
>   initialDelaySeconds: 30
>   periodSeconds: 60
> ...{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15642) Support to set JobManager readiness and liveness check

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15642:
--
Parent: (was: FLINK-14460)
Issue Type: New Feature  (was: Sub-task)

> Support to set JobManager readiness and liveness check
> --
>
> Key: FLINK-15642
> URL: https://issues.apache.org/jira/browse/FLINK-15642
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Priority: Major
>
> The liveness of TaskManager will be controlled by Flink Master. When it 
> failed, timeout, a new pod will be started to replace. We need to add a 
> liveness check for JobManager.
>  
> It just like what we could do in the yaml.
> {code:java}
> ...
> livenessProbe:
>   tcpSocket:
> port: 6123
>   initialDelaySeconds: 30
>   periodSeconds: 60
> ...{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15641) Support to start init container

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15641:
--
Parent: FLINK-17709
Issue Type: Sub-task  (was: New Feature)

> Support to start init container
> ---
>
> Key: FLINK-15641
> URL: https://issues.apache.org/jira/browse/FLINK-15641
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Priority: Major
>
> >> Why do we need init container?
> The init container could be used to prepare the use jars and dependencies. 
> Then we could always set the user image to Flink official image both for 
> standalone per-job on K8s or native K8s per-job. When the JobManager and 
> TaskManager container launched, the user jars will already exist there. I 
> think many users are running standalone per-job cluster in production by 
> using this way.
> The init container only works for K8s cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425559040



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher extends AbstractFetcher 
{

Review comment:
   That's a good question. 
   
   It is because what I really override is fetcher's method (how fetcher fetch 
records, deserialize records and emit watermarks). That's why I have to 
subclass Fetcher anyway.
   
   I tried different ways, there is one way to avoid duplicated code, but needs 
to add a default constructor in one of consumer or fetcher base classes.
   
   I think it is unsafe to do it, and that's why it is ended up like this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai commented on pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

2020-05-14 Thread GitBox


tzulitai commented on pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#issuecomment-629018405


   @igalshilman I've addressed all comments except from the one about using 
Docker named volumes. Please let me know what you think, and if there are no 
further objections, I'll merge this PR. Thank you!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Zakelly commented on pull request #12112: [FLINK-16075][docs-zh] Translate "The Broadcast State Pattern" page into Chinese

2020-05-14 Thread GitBox


Zakelly commented on pull request #12112:
URL: https://github.com/apache/flink/pull/12112#issuecomment-629017825


   > @Zakelly thanks for your contribution, I left some comments.
   
   Thanks for the detailed review from @klion26 . I have made a new commit.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17709) Active Kubernetes integration phase 3 - Advanced Features

2020-05-14 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107924#comment-17107924
 ] 

Canbin Zheng commented on FLINK-17709:
--

Sounds great!

> Active Kubernetes integration phase 3 - Advanced Features
> -
>
> Key: FLINK-17709
> URL: https://issues.apache.org/jira/browse/FLINK-17709
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes, Runtime / Coordination
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.12.0
>
>
> This is the umbrella issue to track all the advanced features for phase 3 of 
> active Kubernetes integration in Flink 1.12.0. Some of the features are as 
> follows:
> # Support multiple JobManagers in ZooKeeper based HA setups.
> # Support user-specified pod templates.
> # Support FileSystem based high availability.
> # Support running PyFlink.
> # Support accessing secured services via K8s secrets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] Myasuka commented on pull request #8693: [FLINK-8871] Support to cancel checkpoing via notification

2020-05-14 Thread GitBox


Myasuka commented on pull request #8693:
URL: https://github.com/apache/flink/pull/8693#issuecomment-629017420


   For the 2nd question @romanovacca , after discussed with @zhijiangW offline, 
we believe this PR with cancellation via RPC would not take part in the data 
transmission in channel, thus the upstream task would be unblocked once the 
channel is unblocked which is the same logic as before.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-17457) Manage Hive metadata via Flink DDL

2020-05-14 Thread Rui Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Li updated FLINK-17457:
---
Parent: (was: FLINK-17198)
Issue Type: Wish  (was: Sub-task)

> Manage Hive metadata via Flink DDL
> --
>
> Key: FLINK-17457
> URL: https://issues.apache.org/jira/browse/FLINK-17457
> Project: Flink
>  Issue Type: Wish
>  Components: Connectors / Hive
>Reporter: Rui Li
>Priority: Major
>
> To implement hive dialect, we encode lots of DDL semantics as properties. We 
> should decide whether/how to allow users to directly use these properties in 
> Flink DDL to achieve the same semantics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14460) Active Kubernetes integration phase 2 - Advanced Features

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-14460:
--
Description: 
This is phase 2 of active kubernetes integration. It is a umbrella jira to 
track all the advanced features and make Flink on Kubernetes production ready.

 

Since the K8s is fast evolving and many new features will be supported in the 
future. Recently, i am thinking about which feature should be supported in 
flink configuration directly. And i have asked some suggestion from our users. 
I get the very rough conclusion here.

> Very common use features. They should be directly supported in flink 
> configuration
 * Kube config, namespace, service account, image(pull-policy, secrets), 
service exposed type
 * Annotations, Labels, NodeSelector
 * Toleration
 * Mem and cpu resources for jobmanager and taskmanager
 * Jobmanager deployment replication, support multiple jobmanagers to get fast 
recovery

> Uncommon, but required by few production users. Maybe pod template 
> FLINK-15656 is enough(will be supported in phase3).
 * Affinity and anti-affinity
 * Volume mount, from persistent volume, user created configmap, etc.
 * Dns policy
 * Sidecar container, will be used for debugging, log collecting, etc. In our 
internal production environment, we choose log4j2 custom appender to collect 
the jobmanager/taskmanger logs to persistent storage(HDFS, Aliyun OSS, etc.)
 * Init container, various initialization before user program running, download 
user jars and dependencies, register with external system, etc. We provide two 
ways for the image choice, user specified image with users jars baked in or 
official image with init container.

  was:
This is phase 2 of active kubernetes integration. It is a umbrella jira to 
track all the advanced features and make Flink on Kubernetes production ready.

 

Since the K8s is fast evolving and many new features will be supported in the 
future. Recently, i am thinking about which feature should be supported in 
flink configuration directly. And i have asked some suggestion from our users. 
I get the very rough conclusion here.

> Very common use features. They should be directly supported in flink 
> configuration
 * Kube config, namespace, service account, image(pull-policy, secrets), 
service exposed type
 * Annotations, Labels, NodeSelector
 * Toleration
 * Mem and cpu resources for jobmanager and taskmanager
 * Jobmanager deployment replication, support multiple jobmanagers to get fast 
recovery

> Uncommon, but required by few production users. Maybe pod template 
> FLINK-15656 is enough.
 * Affinity and anti-affinity
 * Volume mount, from persistent volume, user created configmap, etc.
 * Dns policy
 * Sidecar container, will be used for debugging, log collecting, etc. In our 
internal production environment, we choose log4j2 custom appender to collect 
the jobmanager/taskmanger logs to persistent storage(HDFS, Aliyun OSS, etc.)
 * Init container, various initialization before user program running, download 
user jars and dependencies, register with external system, etc. We provide two 
ways for the image choice, user specified image with users jars baked in or 
official image with init container.


> Active Kubernetes integration phase 2 - Advanced Features
> -
>
> Key: FLINK-14460
> URL: https://issues.apache.org/jira/browse/FLINK-14460
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Priority: Major
> Fix For: 1.11.0
>
>
> This is phase 2 of active kubernetes integration. It is a umbrella jira to 
> track all the advanced features and make Flink on Kubernetes production ready.
>  
> Since the K8s is fast evolving and many new features will be supported in the 
> future. Recently, i am thinking about which feature should be supported in 
> flink configuration directly. And i have asked some suggestion from our 
> users. I get the very rough conclusion here.
> > Very common use features. They should be directly supported in flink 
> > configuration
>  * Kube config, namespace, service account, image(pull-policy, secrets), 
> service exposed type
>  * Annotations, Labels, NodeSelector
>  * Toleration
>  * Mem and cpu resources for jobmanager and taskmanager
>  * Jobmanager deployment replication, support multiple jobmanagers to get 
> fast recovery
> > Uncommon, but required by few production users. Maybe pod template 
> > FLINK-15656 is enough(will be supported in phase3).
>  * Affinity and anti-affinity
>  * Volume mount, from persistent volume, user created configmap, etc.
>  * Dns policy
>  * Sidecar container, will be used for debugging, log collecting, etc. In our 
> internal production environment, we choose log4j2 custom appender to collect 
> 

[GitHub] [flink] flinkbot edited a comment on pull request #12161: [FLINK-17450][sql-parser][hive] Implement function & catalog DDLs for Hive dialect

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12161:
URL: https://github.com/apache/flink/pull/12161#issuecomment-629011624


   
   ## CI report:
   
   * fb208fbfdf55bdde668c609a3305d938b8105f95 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1363)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-14986) Support to get detailed Kubernetes cluster description

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-14986:
--
Parent: FLINK-17709
Issue Type: Sub-task  (was: New Feature)

> Support to get detailed Kubernetes cluster description
> --
>
> Key: FLINK-14986
> URL: https://issues.apache.org/jira/browse/FLINK-14986
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Assignee: ouyangwulin
>Priority: Major
>
> Currently Flink supports get yarn cluster description by 
> `YarnClusterDescriptor#getClusterDescription`. We should support the same 
> behavior in Kubernetes cluster.
> Usually the cluster description contains the "total resources, available 
> resources, etc."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14986) Support to get detailed Kubernetes cluster description

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-14986:
--
Parent: (was: FLINK-14460)
Issue Type: New Feature  (was: Sub-task)

> Support to get detailed Kubernetes cluster description
> --
>
> Key: FLINK-14986
> URL: https://issues.apache.org/jira/browse/FLINK-14986
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Assignee: ouyangwulin
>Priority: Major
>
> Currently Flink supports get yarn cluster description by 
> `YarnClusterDescriptor#getClusterDescription`. We should support the same 
> behavior in Kubernetes cluster.
> Usually the cluster description contains the "total resources, available 
> resources, etc."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #12119: [FLINK-17649][table-planner-blink] Generated hash aggregate code may …

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12119:
URL: https://github.com/apache/flink/pull/12119#issuecomment-628150518


   
   ## CI report:
   
   * f2ab38fb3fd70e6fa9f9de2350dfb9b6bb4a477c Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1302)
 
   * 6492295182327f4a8289816abd20d6a8e66e35b1 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1362)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12137:
URL: https://github.com/apache/flink/pull/12137#issuecomment-628291485


   
   ## CI report:
   
   * 282fa84594e24e0028e0e87d882c6cd656fcd18b Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1342)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-16760) Support the yaml file submission for native Kubernetes integration

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-16760:
--
Parent: FLINK-17709
Issue Type: Sub-task  (was: New Feature)

> Support the yaml file submission for native Kubernetes integration
> --
>
> Key: FLINK-16760
> URL: https://issues.apache.org/jira/browse/FLINK-16760
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes, Deployment / Scripts
>Reporter: Yang Wang
>Priority: Major
>
> Currently, the native K8s integration is friendly and convenient to the Flink 
> users, especially they have some experience of YARN deployment. The 
> submission command and process are very similar and could be integrated into 
> their existing deployer(i.e. job lifecycle management system).
> However, if you are a K8s user and prefer the K8s way to start the Flink 
> cluster(regarding it as a application). Then yaml way is more appropriate.
>  
> What's the difference between standalone on K8s[1] and this ticket?
>  # Dynamic resource allocation
>  # Do not need to create taskmanager deployment yaml. Since the taskmanagers 
> will be allocated by Flink ResourceManager dynamically on demand. The 
> configmap, jobmanager deployment, service still need to be created manually.
>  # Service account needs to be prepared beforehand[2].
>  # Some scripts(flink-console.sh, jobmanager.sh, etc.) needs to be updated to 
> use native K8s entrypoint 
> {{org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint}}
>  
> Do we have the alternative option?
> A possible way is using a K8s job(yaml file) to run the 
> {{kubernetes-session.sh}} to start the Flink cluster. This is just moving the 
> submission from local machine to K8s cluster.
>  
> [1]. 
> [https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html]
> [2]. 
> [https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#rbac]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #12025: [FLINK-17435] [hive] Hive non-partitioned source supports streaming read

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #12025:
URL: https://github.com/apache/flink/pull/12025#issuecomment-625292121


   
   ## CI report:
   
   * 04db04d8d1b41ca05618c5b09b83b3031bf7ee11 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1313)
 
   * d4b70dbd62d22edbaf0c3ff54ca61a35ac93c04e Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1356)
 
   * c38c037f31687b4d20ef0e183d42cc5f64d3bb45 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-15643) Support to start multiple Flink masters to achieve faster recovery

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15643:
--
Parent: FLINK-17709
Issue Type: Sub-task  (was: New Feature)

> Support to start multiple Flink masters to achieve faster recovery
> --
>
> Key: FLINK-15643
> URL: https://issues.apache.org/jira/browse/FLINK-15643
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Priority: Major
>
> The replicas of Flink master deployment could be set more than 1 to achieve 
> faster recovery. When the active Flink master failed, the standby one will 
> take over immediately.
> For standalone cluster, some users have used it in the production.
>  
> Even the zookeeper high availability could be used for leader election of 
> multiple Flink master, we will do this feature after K8s native HA supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #11385: [FLINK-11395][formats] Support for Avro StreamingFileSink

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #11385:
URL: https://github.com/apache/flink/pull/11385#issuecomment-597956562


   
   ## CI report:
   
   * 9e1bbeafa1a4d0f673ebba7f87ef9032253a29f0 UNKNOWN
   * 55bbaced3ca77aecde372a75505363cceb280fb6 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1226)
 
   * 88eb4f91c72b735fe947ea5beb4ab430138a6077 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1361)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-16760) Support the yaml file submission for native Kubernetes integration

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-16760:
--
Parent: (was: FLINK-14460)
Issue Type: New Feature  (was: Sub-task)

> Support the yaml file submission for native Kubernetes integration
> --
>
> Key: FLINK-16760
> URL: https://issues.apache.org/jira/browse/FLINK-16760
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes, Deployment / Scripts
>Reporter: Yang Wang
>Priority: Major
>
> Currently, the native K8s integration is friendly and convenient to the Flink 
> users, especially they have some experience of YARN deployment. The 
> submission command and process are very similar and could be integrated into 
> their existing deployer(i.e. job lifecycle management system).
> However, if you are a K8s user and prefer the K8s way to start the Flink 
> cluster(regarding it as a application). Then yaml way is more appropriate.
>  
> What's the difference between standalone on K8s[1] and this ticket?
>  # Dynamic resource allocation
>  # Do not need to create taskmanager deployment yaml. Since the taskmanagers 
> will be allocated by Flink ResourceManager dynamically on demand. The 
> configmap, jobmanager deployment, service still need to be created manually.
>  # Service account needs to be prepared beforehand[2].
>  # Some scripts(flink-console.sh, jobmanager.sh, etc.) needs to be updated to 
> use native K8s entrypoint 
> {{org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint}}
>  
> Do we have the alternative option?
> A possible way is using a K8s job(yaml file) to run the 
> {{kubernetes-session.sh}} to start the Flink cluster. This is just moving the 
> submission from local machine to K8s cluster.
>  
> [1]. 
> [https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html]
> [2]. 
> [https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#rbac]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15643) Support to start multiple Flink masters to achieve faster recovery

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15643:
--
Parent: (was: FLINK-14460)
Issue Type: New Feature  (was: Sub-task)

> Support to start multiple Flink masters to achieve faster recovery
> --
>
> Key: FLINK-15643
> URL: https://issues.apache.org/jira/browse/FLINK-15643
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Priority: Major
>
> The replicas of Flink master deployment could be set more than 1 to achieve 
> faster recovery. When the active Flink master failed, the standby one will 
> take over immediately.
> For standalone cluster, some users have used it in the production.
>  
> Even the zookeeper high availability could be used for leader election of 
> multiple Flink master, we will do this feature after K8s native HA supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception

2020-05-14 Thread GitBox


flinkbot edited a comment on pull request #11010:
URL: https://github.com/apache/flink/pull/11010#issuecomment-581769412


   
   ## CI report:
   
   * 1130280dfe557a0268fed248089fb32d15f832d9 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160664763) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7620)
 
   * 27818c93f9a8d1f580b96bca10111f0cb95bbb07 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-15871) Support to start sidecar container

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15871:
--
Parent: (was: FLINK-14460)
Issue Type: New Feature  (was: Sub-task)

> Support to start sidecar container
> --
>
> Key: FLINK-15871
> URL: https://issues.apache.org/jira/browse/FLINK-15871
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Yang Wang
>Priority: Major
> Fix For: 1.11.0
>
>
> >> How does sidecar container work?
> A sidecar container is running beside the Jobmanager and TaskManager 
> container. It could be used to collect log or debug some problems. For 
> example, when we configure the sidecar container to fluentd and share the 
> TaskManager log with volume, then it could be used to upload the logs to 
> HDFS, elastic search, etc. Also we could start a sidecar container with 
> debugging image which contains lots of tools and help to debug the network 
> problems.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17225) Support native k8s for scala shell

2020-05-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-17225:
--
Parent: FLINK-17709
Issue Type: Sub-task  (was: New Feature)

> Support native k8s for scala shell
> --
>
> Key: FLINK-17225
> URL: https://issues.apache.org/jira/browse/FLINK-17225
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala Shell
>Reporter: Yang Wang
>Priority: Major
>
> Currently, the Flink scala shell could create a new or retrieve an existing 
> YARN session cluster automatically. It is very convenient for the users.
> Then it will be great we could also support the K8s deployment. Benefit from 
> native K8s integration, it is not very difficult for the implementation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r42309



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer extends FlinkKafkaProducer 
{
+   private final KafkaSerializer kafkaSerializer;
+   private final KeySelector keySelector;
+   private final int numberOfPartitions;
+
+   FlinkKafkaShuffleProducer(
+   String defaultTopicId,
+   TypeInformationSerializationSchema schema,
+   Properties props,
+   KeySelector keySelector,
+   Semantic semantic,
+   int kafkaProducersPoolSize) {
+   super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+   this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+   this.keySelector = keySelector;
+
+   Preconditions.checkArgument(
+   props.getProperty(PARTITION_NUMBER) != null,
+   "Missing partition number for Kafka Shuffle");
+   numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+   }
+
+   /**
+* This is the function invoked to handle each element.
+* @param transaction transaction state;
+*elements are written to Kafka in transactions to 
guarantee different level of data consistency
+* @param next element to handle
+* @param context context needed to handle the element
+* @throws FlinkKafkaException for kafka error
+*/
+   @Override
+   public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+   checkErroneous();
+
+   // write timestamp to Kafka if timestamp is available
+   Long timestamp = context.timestamp();
+
+   int[] partitions = getPartitions(transaction);
+   int partitionIndex;
+   try {
+   partitionIndex = KeyGroupRangeAssignment
+   
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+   } catch (Exception e) {
+   throw new RuntimeException("Fail to assign a partition 
number to record");
+   }
+
+   ProducerRecord record = new ProducerRecord<>(
+   defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));
+   pendingRecords.incrementAndGet();
+   transaction.getProducer().send(record, callback);
+   }
+
+   /**
+* This is the function invoked to handle each watermark.
+* 

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425554722



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer extends FlinkKafkaProducer 
{
+   private final KafkaSerializer kafkaSerializer;
+   private final KeySelector keySelector;
+   private final int numberOfPartitions;
+
+   FlinkKafkaShuffleProducer(
+   String defaultTopicId,
+   TypeInformationSerializationSchema schema,
+   Properties props,
+   KeySelector keySelector,
+   Semantic semantic,
+   int kafkaProducersPoolSize) {
+   super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+   this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+   this.keySelector = keySelector;
+
+   Preconditions.checkArgument(
+   props.getProperty(PARTITION_NUMBER) != null,
+   "Missing partition number for Kafka Shuffle");
+   numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+   }
+
+   /**
+* This is the function invoked to handle each element.
+* @param transaction transaction state;
+*elements are written to Kafka in transactions to 
guarantee different level of data consistency
+* @param next element to handle
+* @param context context needed to handle the element
+* @throws FlinkKafkaException for kafka error
+*/
+   @Override
+   public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+   checkErroneous();
+
+   // write timestamp to Kafka if timestamp is available
+   Long timestamp = context.timestamp();
+
+   int[] partitions = getPartitions(transaction);
+   int partitionIndex;
+   try {
+   partitionIndex = KeyGroupRangeAssignment
+   
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+   } catch (Exception e) {
+   throw new RuntimeException("Fail to assign a partition 
number to record");
+   }
+
+   ProducerRecord record = new ProducerRecord<>(
+   defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));

Review comment:
   OK, I will do another pass. 





This is an automated message from the Apache Git Service.
To respond to the message, please log 

[jira] [Commented] (FLINK-17709) Active Kubernetes integration phase 3 - Advanced Features

2020-05-14 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107912#comment-17107912
 ] 

Yang Wang commented on FLINK-17709:
---

Thanks a lot for creating this umbrella tickets and keep working on native K8s 
integration. I will move some features as subtasks, which have not been done in 
phase2 or already in plan for later.

> Active Kubernetes integration phase 3 - Advanced Features
> -
>
> Key: FLINK-17709
> URL: https://issues.apache.org/jira/browse/FLINK-17709
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes, Runtime / Coordination
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.12.0
>
>
> This is the umbrella issue to track all the advanced features for phase 3 of 
> active Kubernetes integration in Flink 1.12.0. Some of the features are as 
> follows:
> # Support multiple JobManagers in ZooKeeper based HA setups.
> # Support user-specified pod templates.
> # Support FileSystem based high availability.
> # Support running PyFlink.
> # Support accessing secured services via K8s secrets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425553720



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(0));
+   }
+
+   @Test
+   public void testSetAndGetExtendedResourcesWithYarnSupport() {
+   

[GitHub] [flink] Myasuka commented on a change in pull request #12153: [FLINK-16966][metrics][infuxdb] Convert InfluxDB reporter to plugin

2020-05-14 Thread GitBox


Myasuka commented on a change in pull request #12153:
URL: https://github.com/apache/flink/pull/12153#discussion_r425552234



##
File path: 
flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporterFactory.java
##
@@ -0,0 +1,14 @@
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
+
+import java.util.Properties;
+
+public class InfluxdbReporterFactory implements MetricReporterFactory {

Review comment:
   This class miss a javadoc and should not pass during checkstyle.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   7   8   9   10   >