[GitHub] tillrohrmann commented on issue #7111: [FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test

2018-11-16 Thread GitBox
tillrohrmann commented on issue #7111: [FLINK-10856] Find latest completed 
checkpoint for resume from externalized checkpoint e2e test
URL: https://github.com/apache/flink/pull/7111#issuecomment-439315975
 
 
   Thanks for the review @zentol. Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-10877) Remove duplicate dependency entries in kafka connector pom

2018-11-16 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10877.
---
Resolution: Fixed

Fixed via
master: 
https://github.com/apache/flink/commit/00274233b40230559f942ae13fc1d19e54f79fbe
1.7.0: 
https://github.com/apache/flink/commit/4c52cdbda1fb13754bad4c5ad4a7eb81ba3884f5

> Remove duplicate dependency entries in kafka connector pom
> --
>
> Key: FLINK-10877
> URL: https://issues.apache.org/jira/browse/FLINK-10877
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Trivial
>  Labels: pull-request-available
>
> The {{flink-connectorkafka}} {{pom.xml}} contains multiple dependency entries 
> for {{flink-connector-kafka-base}} and moreover excludes dependencies from a 
> test-jar. This is not necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10705) Rework Flink Web Dashboard

2018-11-16 Thread Fabian Wollert (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689145#comment-16689145
 ] 

Fabian Wollert commented on FLINK-10705:


build and deployment to our dev cluster worked fine. since i'm not familiar 
with angular and/or typescript i cant review the actual code unfortunately. and 
since there is not much content/functionality yet to check, i cant be much help 
here unfortunately so far :(

> Rework Flink Web Dashboard
> --
>
> Key: FLINK-10705
> URL: https://issues.apache.org/jira/browse/FLINK-10705
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.6.2
>Reporter: Fabian Wollert
>Assignee: Fabian Wollert
>Priority: Major
> Fix For: 1.8.0
>
> Attachments: image-2018-10-29-09-17-24-115.png
>
>
> The Flink Dashboard is very simple currently and should get updated. This is 
> the umbrella ticket for other tickets regarding this. Please check the 
> sub-tickets for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #7094: [FLINK-10877] Cleanup flink-connector-kafka pom file

2018-11-16 Thread GitBox
tillrohrmann closed pull request #7094: [FLINK-10877] Cleanup 
flink-connector-kafka pom file
URL: https://github.com/apache/flink/pull/7094
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kafka/pom.xml 
b/flink-connectors/flink-connector-kafka/pom.xml
index b90cea2093a..67ec39d8f27 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -140,28 +140,6 @@ under the License.
org.apache.flink

flink-connector-kafka-base_${scala.binary.version}
${project.version}
-   
-   
-   
-   org.apache.kafka
-   
kafka_${scala.binary.version}
-   
-   
-   test-jar
-   test
-   
-
-   
-   org.apache.flink
-   
flink-connector-kafka-base_${scala.binary.version}
-   ${project.version}
-   
-   
-   
-   org.apache.kafka
-   
kafka_${scala.binary.version}
-   
-   
test-jar
test



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10877) Remove duplicate dependency entries in kafka connector pom

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689146#comment-16689146
 ] 

ASF GitHub Bot commented on FLINK-10877:


tillrohrmann closed pull request #7094: [FLINK-10877] Cleanup 
flink-connector-kafka pom file
URL: https://github.com/apache/flink/pull/7094
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kafka/pom.xml 
b/flink-connectors/flink-connector-kafka/pom.xml
index b90cea2093a..67ec39d8f27 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -140,28 +140,6 @@ under the License.
org.apache.flink

flink-connector-kafka-base_${scala.binary.version}
${project.version}
-   
-   
-   
-   org.apache.kafka
-   
kafka_${scala.binary.version}
-   
-   
-   test-jar
-   test
-   
-
-   
-   org.apache.flink
-   
flink-connector-kafka-base_${scala.binary.version}
-   ${project.version}
-   
-   
-   
-   org.apache.kafka
-   
kafka_${scala.binary.version}
-   
-   
test-jar
test



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove duplicate dependency entries in kafka connector pom
> --
>
> Key: FLINK-10877
> URL: https://issues.apache.org/jira/browse/FLINK-10877
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Trivial
>  Labels: pull-request-available
>
> The {{flink-connectorkafka}} {{pom.xml}} contains multiple dependency entries 
> for {{flink-connector-kafka-base}} and moreover excludes dependencies from a 
> test-jar. This is not necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #7113: [BP-1.6][FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test

2018-11-16 Thread GitBox
tillrohrmann closed pull request #7113: [BP-1.6][FLINK-10856] Find latest 
completed checkpoint for resume from externalized checkpoint e2e test
URL: https://github.com/apache/flink/pull/7113
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 9719e183cf0..5efdf0a974a 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -659,3 +659,10 @@ function expect_in_taskmanager_logs {
 fi
 done
 }
+
+function find_latest_completed_checkpoint {
+local checkpoint_root_directory=$1
+# a completed checkpoint must contain the _metadata file
+local checkpoint_meta_file=$(ls -d 
${checkpoint_root_directory}/chk-[1-9]*/_metadata | sort -Vr | head -n1)
+echo "$(dirname "${checkpoint_meta_file}")"
+}
diff --git 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index c1477574d5d..35fe30b6b25 100755
--- 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -112,7 +112,7 @@ else
 fi
 
 # take the latest checkpoint
-CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]* | sort -Vr 
| head -n1)
+CHECKPOINT_PATH=$(find_latest_completed_checkpoint 
${CHECKPOINT_DIR}/${DATASTREAM_JOB})
 
 if [ -z $CHECKPOINT_PATH ]; then
   echo "Expected an externalized checkpoint to be present, but none exists."


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann closed pull request #7111: [FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test

2018-11-16 Thread GitBox
tillrohrmann closed pull request #7111: [FLINK-10856] Find latest completed 
checkpoint for resume from externalized checkpoint e2e test
URL: https://github.com/apache/flink/pull/7111
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index ea267538639..4e6254864c1 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -656,3 +656,10 @@ function wait_for_restart_to_complete {
 fi
 done
 }
+
+function find_latest_completed_checkpoint {
+local checkpoint_root_directory=$1
+# a completed checkpoint must contain the _metadata file
+local checkpoint_meta_file=$(ls -d 
${checkpoint_root_directory}/chk-[1-9]*/_metadata | sort -Vr | head -n1)
+echo "$(dirname "${checkpoint_meta_file}")"
+}
diff --git 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index c1477574d5d..35fe30b6b25 100755
--- 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -112,7 +112,7 @@ else
 fi
 
 # take the latest checkpoint
-CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]* | sort -Vr 
| head -n1)
+CHECKPOINT_PATH=$(find_latest_completed_checkpoint 
${CHECKPOINT_DIR}/${DATASTREAM_JOB})
 
 if [ -z $CHECKPOINT_PATH ]; then
   echo "Expected an externalized checkpoint to be present, but none exists."


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann closed pull request #7112: [BP-1.7][FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test

2018-11-16 Thread GitBox
tillrohrmann closed pull request #7112: [BP-1.7][FLINK-10856] Find latest 
completed checkpoint for resume from externalized checkpoint e2e test
URL: https://github.com/apache/flink/pull/7112
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index ea267538639..4e6254864c1 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -656,3 +656,10 @@ function wait_for_restart_to_complete {
 fi
 done
 }
+
+function find_latest_completed_checkpoint {
+local checkpoint_root_directory=$1
+# a completed checkpoint must contain the _metadata file
+local checkpoint_meta_file=$(ls -d 
${checkpoint_root_directory}/chk-[1-9]*/_metadata | sort -Vr | head -n1)
+echo "$(dirname "${checkpoint_meta_file}")"
+}
diff --git 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index c1477574d5d..35fe30b6b25 100755
--- 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -112,7 +112,7 @@ else
 fi
 
 # take the latest checkpoint
-CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]* | sort -Vr 
| head -n1)
+CHECKPOINT_PATH=$(find_latest_completed_checkpoint 
${CHECKPOINT_DIR}/${DATASTREAM_JOB})
 
 if [ -z $CHECKPOINT_PATH ]; then
   echo "Expected an externalized checkpoint to be present, but none exists."


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10856) Harden resume from externalized checkpoint E2E test

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689148#comment-16689148
 ] 

ASF GitHub Bot commented on FLINK-10856:


tillrohrmann commented on issue #7111: [FLINK-10856] Find latest completed 
checkpoint for resume from externalized checkpoint e2e test
URL: https://github.com/apache/flink/pull/7111#issuecomment-439315975
 
 
   Thanks for the review @zentol. Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Harden resume from externalized checkpoint E2E test
> ---
>
> Key: FLINK-10856
> URL: https://issues.apache.org/jira/browse/FLINK-10856
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, State Backends, Checkpointing
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The resume from externalized checkpoints E2E test can fail due to 
> FLINK-10855. We should harden the test script to not expect a single 
> checkpoint directory being present but to take the checkpoint with the 
> highest checkpoint counter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #7114: [BP-1.5][FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test

2018-11-16 Thread GitBox
tillrohrmann closed pull request #7114: [BP-1.5][FLINK-10856] Find latest 
completed checkpoint for resume from externalized checkpoint e2e test
URL: https://github.com/apache/flink/pull/7114
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 018e12d6df0..8b370582149 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -531,3 +531,10 @@ function clean_log_files {
 rm ${FLINK_DIR}/log/*
 echo "Deleted all files under ${FLINK_DIR}/log/"
 }
+
+function find_latest_completed_checkpoint {
+local checkpoint_root_directory=$1
+# a completed checkpoint must contain the _metadata file
+local checkpoint_meta_file=$(ls -d 
${checkpoint_root_directory}/chk-[1-9]*/_metadata | sort -Vr | head -n1)
+echo "$(dirname "${checkpoint_meta_file}")"
+}
diff --git 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index e0b97d6f6a8..f406784b4c5 100755
--- 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -113,7 +113,7 @@ else
 fi
 
 # take the latest checkpoint
-CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]* | sort -Vr 
| head -n1)
+CHECKPOINT_PATH=$(find_latest_completed_checkpoint 
${CHECKPOINT_DIR}/${DATASTREAM_JOB})
 
 if [ -z $CHECKPOINT_PATH ]; then
   echo "Expected an externalized checkpoint to be present, but none exists."


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689159#comment-16689159
 ] 

ASF GitHub Bot commented on FLINK-10843:


twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table 
factory versioning more flexible
URL: https://github.com/apache/flink/pull/7087#issuecomment-439317653
 
 
   @pnowojski I updated the PR. Now we just matched against the version 
`universal`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make Kafka version definition more flexible for new Kafka table factory
> ---
>
> Key: FLINK-10843
> URL: https://issues.apache.org/jira/browse/FLINK-10843
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, a user has to specify a specific version for a Kafka connector 
> like:
> {code}
> connector:
>   type: kafka
>   version: "0.11" # required: valid connector versions are "0.8", "0.9", 
> "0.10", and "0.11"
>   topic: ...  # required: topic name from which the table is read
> {code}
> However, the new Kafka connector aims to be universal, thus, at least for 1.x 
> and 2.x versions which we should support those as parameters as well. 
> Currently, {{2.0}} is the only accepted string for the factory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible

2018-11-16 Thread GitBox
twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table 
factory versioning more flexible
URL: https://github.com/apache/flink/pull/7087#issuecomment-439317653
 
 
   @pnowojski I updated the PR. Now we just matched against the version 
`universal`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ambition119 opened a new pull request #7120: fix up RowSerializer.copy occur ClassCastException

2018-11-16 Thread GitBox
ambition119 opened a new pull request #7120: fix up RowSerializer.copy occur 
ClassCastException
URL: https://github.com/apache/flink/pull/7120
 
 
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   
   
   I create jira 
[FLINK-10299](https://issues.apache.org/jira/browse/FLINK-10299) , this pull 
request fix up RowSerializer.copy occur ClassCastException.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions

2018-11-16 Thread xymaqingxiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xymaqingxiang updated FLINK-10908:
--
Description: 
Add yarn.application.priority in YarnConfigOptions, changes the priority of the 
Flink YARN application.

Allowed priority values are 5,4,3,2,1, respectively correspond to VERY_HIGH, 
HIGH, NORMAL, LOW, VERY_LOW.

  was:Add yarn.application.priority in YarnConfigOptions, Changes the priority 
of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW.


> Add yarn.application.priority in YarnConfigOptions
> --
>
> Key: FLINK-10908
> URL: https://issues.apache.org/jira/browse/FLINK-10908
> Project: Flink
>  Issue Type: Improvement
>Reporter: xymaqingxiang
>Priority: Major
>  Labels: pull-request-available
>
> Add yarn.application.priority in YarnConfigOptions, changes the priority of 
> the Flink YARN application.
> Allowed priority values are 5,4,3,2,1, respectively correspond to VERY_HIGH, 
> HIGH, NORMAL, LOW, VERY_LOW.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #7115: [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out

2018-11-16 Thread GitBox
tillrohrmann closed pull request #7115: [FLINK-10883] Failing batch jobs with 
NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf51037b8b..e3b501e52e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
deploymentFuture.whenComplete(
(Void ignored, Throwable failure) -> {
if (failure != null) {
-   
markFailed(ExceptionUtils.stripCompletionException(failure));
+   final Throwable 
stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+   final Throwable 
schedulingFailureCause;
+
+   if (stripCompletionException 
instanceof TimeoutException) {
+   schedulingFailureCause 
= new NoResourceAvailableException(
+   "Could not 
allocate enough slots within timeout of " + allocationTimeout + " to run the 
job. " +
+   "Please 
make sure that the cluster has enough resources.");
+   } else {
+   schedulingFailureCause 
= stripCompletionException;
+   }
+
+   
markFailed(schedulingFailureCause);
}
});
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3b55e009116..56315e07146 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException {
final CompletableFuture 
schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
-   LocationPreferenceConstraint.ALL,// 
since it is an input vertex, the input based location preferences should be 
empty
+   LocationPreferenceConstraint.ALL, // 
since it is an input vertex, the input based location preferences should be 
empty
Collections.emptySet());
 

schedulingFutures.add(schedulingJobVertexFuture);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index c108eaee36a..585e1badf47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -101,7 +101,42 @@ public void runJobWithMultipleRpcServices() throws 
Exception {
}
 
@Test
-   public void testHandleJobsWhenNotEnoughSlot() throws Exception {
+   public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception 
{
+   try {
+   
setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER);
+   fail("Job should fail.");
+   } 

[GitHub] tillrohrmann commented on issue #7115: [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out

2018-11-16 Thread GitBox
tillrohrmann commented on issue #7115: [FLINK-10883] Failing batch jobs with 
NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7115#issuecomment-439316401
 
 
   Thanks for the review @zentol. Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10856) Harden resume from externalized checkpoint E2E test

2018-11-16 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-10856.
-
Resolution: Fixed

Fixed via
master: 
https://github.com/apache/flink/commit/bddfe23d7df18099982958d21f7bcd01039909b9
1.7.0: 
https://github.com/apache/flink/commit/6c21335004ce487e104bc85bd3aae211e1319a20
1.6.3: 
https://github.com/apache/flink/commit/b4e6d9b03facaf572caddaed5a70afcbffd083a2
1.5.6: 
https://github.com/apache/flink/commit/0c27eda46ccf88ca794bb7831a788df303a5967e

> Harden resume from externalized checkpoint E2E test
> ---
>
> Key: FLINK-10856
> URL: https://issues.apache.org/jira/browse/FLINK-10856
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, State Backends, Checkpointing
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The resume from externalized checkpoints E2E test can fail due to 
> FLINK-10855. We should harden the test script to not expect a single 
> checkpoint directory being present but to take the checkpoint with the 
> highest checkpoint counter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #7117: [BP-1.6][FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out

2018-11-16 Thread GitBox
tillrohrmann closed pull request #7117: [BP-1.6][FLINK-10883] Failing batch 
jobs with NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7117
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf51037b8b..e3b501e52e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
deploymentFuture.whenComplete(
(Void ignored, Throwable failure) -> {
if (failure != null) {
-   
markFailed(ExceptionUtils.stripCompletionException(failure));
+   final Throwable 
stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+   final Throwable 
schedulingFailureCause;
+
+   if (stripCompletionException 
instanceof TimeoutException) {
+   schedulingFailureCause 
= new NoResourceAvailableException(
+   "Could not 
allocate enough slots within timeout of " + allocationTimeout + " to run the 
job. " +
+   "Please 
make sure that the cluster has enough resources.");
+   } else {
+   schedulingFailureCause 
= stripCompletionException;
+   }
+
+   
markFailed(schedulingFailureCause);
}
});
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3b55e009116..56315e07146 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException {
final CompletableFuture 
schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
-   LocationPreferenceConstraint.ALL,// 
since it is an input vertex, the input based location preferences should be 
empty
+   LocationPreferenceConstraint.ALL, // 
since it is an input vertex, the input based location preferences should be 
empty
Collections.emptySet());
 

schedulingFutures.add(schedulingJobVertexFuture);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann closed pull request #7116: [BP-1.7][FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out

2018-11-16 Thread GitBox
tillrohrmann closed pull request #7116: [BP-1.7][FLINK-10883] Failing batch 
jobs with NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7116
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf51037b8b..e3b501e52e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
deploymentFuture.whenComplete(
(Void ignored, Throwable failure) -> {
if (failure != null) {
-   
markFailed(ExceptionUtils.stripCompletionException(failure));
+   final Throwable 
stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+   final Throwable 
schedulingFailureCause;
+
+   if (stripCompletionException 
instanceof TimeoutException) {
+   schedulingFailureCause 
= new NoResourceAvailableException(
+   "Could not 
allocate enough slots within timeout of " + allocationTimeout + " to run the 
job. " +
+   "Please 
make sure that the cluster has enough resources.");
+   } else {
+   schedulingFailureCause 
= stripCompletionException;
+   }
+
+   
markFailed(schedulingFailureCause);
}
});
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3b55e009116..56315e07146 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException {
final CompletableFuture 
schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
-   LocationPreferenceConstraint.ALL,// 
since it is an input vertex, the input based location preferences should be 
empty
+   LocationPreferenceConstraint.ALL, // 
since it is an input vertex, the input based location preferences should be 
empty
Collections.emptySet());
 

schedulingFutures.add(schedulingJobVertexFuture);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index c108eaee36a..585e1badf47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -101,7 +101,42 @@ public void runJobWithMultipleRpcServices() throws 
Exception {
}
 
@Test
-   public void testHandleJobsWhenNotEnoughSlot() throws Exception {
+   public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception 
{
+   try {
+   
setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER);
+   fail("Job should fail.");
+   

[jira] [Commented] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689153#comment-16689153
 ] 

ASF GitHub Bot commented on FLINK-10883:


tillrohrmann closed pull request #7115: [FLINK-10883] Failing batch jobs with 
NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf51037b8b..e3b501e52e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
deploymentFuture.whenComplete(
(Void ignored, Throwable failure) -> {
if (failure != null) {
-   
markFailed(ExceptionUtils.stripCompletionException(failure));
+   final Throwable 
stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+   final Throwable 
schedulingFailureCause;
+
+   if (stripCompletionException 
instanceof TimeoutException) {
+   schedulingFailureCause 
= new NoResourceAvailableException(
+   "Could not 
allocate enough slots within timeout of " + allocationTimeout + " to run the 
job. " +
+   "Please 
make sure that the cluster has enough resources.");
+   } else {
+   schedulingFailureCause 
= stripCompletionException;
+   }
+
+   
markFailed(schedulingFailureCause);
}
});
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3b55e009116..56315e07146 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException {
final CompletableFuture 
schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
-   LocationPreferenceConstraint.ALL,// 
since it is an input vertex, the input based location preferences should be 
empty
+   LocationPreferenceConstraint.ALL, // 
since it is an input vertex, the input based location preferences should be 
empty
Collections.emptySet());
 

schedulingFutures.add(schedulingJobVertexFuture);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index c108eaee36a..585e1badf47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -101,7 +101,42 @@ public void runJobWithMultipleRpcServices() throws 
Exception {
}
 
@Test
-   public void testHandleJobsWhenNotEnoughSlot() throws Exception {
+   public 

[jira] [Resolved] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout

2018-11-16 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10883.
---
Resolution: Fixed

Fixed via
master: 
https://github.com/apache/flink/commit/b1965eeb854f16f385329a546d47d684e2891d70
1.7.0: 
https://github.com/apache/flink/commit/f5db114940fad1b96b549801c80b312a9367a55a
1.6.3: 
https://github.com/apache/flink/commit/fd6c3eea8c8c9d371e2cdc5e98ce650be5b5fc37

> Submitting a jobs without enough slots times out due to a unspecified timeout
> -
>
> Key: FLINK-10883
> URL: https://issues.apache.org/jira/browse/FLINK-10883
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> When submitting a job without enough slots being available the job will stay 
> in a SCHEDULED/CREATED state. After some time (a few minutes) the job 
> execution will fail with the following timeout exception:
> {code}
> 2018-11-14 13:38:26,614 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Pending slot 
> request [SlotRequestId{d9c0c94b6b81eae406f3d6cb6150fee4}] timed out.
> 2018-11-14 13:38:26,615 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
> DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
> main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) 
> (1/$java.util.concurrent.TimeoutException
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> That the job submission may time out is not documented, neither is which 
> timeout is responsible in the first place nor how/whether this can be 
> disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689151#comment-16689151
 ] 

ASF GitHub Bot commented on FLINK-10883:


tillrohrmann commented on issue #7115: [FLINK-10883] Failing batch jobs with 
NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7115#issuecomment-439316401
 
 
   Thanks for the review @zentol. Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Submitting a jobs without enough slots times out due to a unspecified timeout
> -
>
> Key: FLINK-10883
> URL: https://issues.apache.org/jira/browse/FLINK-10883
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> When submitting a job without enough slots being available the job will stay 
> in a SCHEDULED/CREATED state. After some time (a few minutes) the job 
> execution will fail with the following timeout exception:
> {code}
> 2018-11-14 13:38:26,614 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Pending slot 
> request [SlotRequestId{d9c0c94b6b81eae406f3d6cb6150fee4}] timed out.
> 2018-11-14 13:38:26,615 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
> DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
> main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) 
> (1/$java.util.concurrent.TimeoutException
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> That the job submission may time out is not documented, neither is which 
> timeout is responsible in the first place nor how/whether this can be 
> disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10856) Harden resume from externalized checkpoint E2E test

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689149#comment-16689149
 ] 

ASF GitHub Bot commented on FLINK-10856:


tillrohrmann closed pull request #7111: [FLINK-10856] Find latest completed 
checkpoint for resume from externalized checkpoint e2e test
URL: https://github.com/apache/flink/pull/7111
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index ea267538639..4e6254864c1 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -656,3 +656,10 @@ function wait_for_restart_to_complete {
 fi
 done
 }
+
+function find_latest_completed_checkpoint {
+local checkpoint_root_directory=$1
+# a completed checkpoint must contain the _metadata file
+local checkpoint_meta_file=$(ls -d 
${checkpoint_root_directory}/chk-[1-9]*/_metadata | sort -Vr | head -n1)
+echo "$(dirname "${checkpoint_meta_file}")"
+}
diff --git 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index c1477574d5d..35fe30b6b25 100755
--- 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -112,7 +112,7 @@ else
 fi
 
 # take the latest checkpoint
-CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]* | sort -Vr 
| head -n1)
+CHECKPOINT_PATH=$(find_latest_completed_checkpoint 
${CHECKPOINT_DIR}/${DATASTREAM_JOB})
 
 if [ -z $CHECKPOINT_PATH ]; then
   echo "Expected an externalized checkpoint to be present, but none exists."


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Harden resume from externalized checkpoint E2E test
> ---
>
> Key: FLINK-10856
> URL: https://issues.apache.org/jira/browse/FLINK-10856
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, State Backends, Checkpointing
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The resume from externalized checkpoints E2E test can fail due to 
> FLINK-10855. We should harden the test script to not expect a single 
> checkpoint directory being present but to take the checkpoint with the 
> highest checkpoint counter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions

2018-11-16 Thread xymaqingxiang (JIRA)
xymaqingxiang created FLINK-10908:
-

 Summary: Add yarn.application.priority in YarnConfigOptions
 Key: FLINK-10908
 URL: https://issues.apache.org/jira/browse/FLINK-10908
 Project: Flink
  Issue Type: Improvement
Reporter: xymaqingxiang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol closed pull request #7119: [FLINK-10907] Fix Flink JobManager metrics from getting stuck after a job recovery.

2018-11-16 Thread GitBox
zentol closed pull request #7119: [FLINK-10907] Fix Flink JobManager metrics 
from getting stuck after a job recovery.
URL: https://github.com/apache/flink/pull/7119
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index e09051d7160..f67b49d6745 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -61,16 +61,17 @@ public String hostname() {
public JobManagerJobMetricGroup addJob(JobGraph job) {
JobID jobId = job.getJobID();
String jobName = job.getName();
-   // get or create a jobs metric group
-   JobManagerJobMetricGroup currentJobGroup;
synchronized (this) {
if (!isClosed()) {
-   currentJobGroup = jobs.get(jobId);
+   JobManagerJobMetricGroup currentJobGroup = 
jobs.get(jobId);
 
-   if (currentJobGroup == null || 
currentJobGroup.isClosed()) {
-   currentJobGroup = new 
JobManagerJobMetricGroup(registry, this, jobId, jobName);
-   jobs.put(jobId, currentJobGroup);
+   if (currentJobGroup != null) {
+   currentJobGroup.close();
}
+
+   currentJobGroup = new 
JobManagerJobMetricGroup(registry, this, jobId, jobName);
+   jobs.put(jobId, currentJobGroup);
+
return currentJobGroup;
} else {
return null;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index cb5ec67c97c..146fb3b1f45 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -32,6 +32,7 @@
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -58,13 +59,15 @@ public void addAndRemoveJobs() throws Exception {
JobManagerJobMetricGroup jmJobGroup12 = group.addJob(new 
JobGraph(jid1, jobName1));
JobManagerJobMetricGroup jmJobGroup21 = group.addJob(new 
JobGraph(jid2, jobName2));
 
-   assertEquals(jmJobGroup11, jmJobGroup12);
+   assertNotEquals(jmJobGroup11, jmJobGroup12);
+   assertTrue(jmJobGroup11.isClosed());
+   assertTrue(!jmJobGroup12.isClosed());
 
assertEquals(2, group.numRegisteredJobMetricGroups());
 
group.removeJob(jid1);
 
-   assertTrue(jmJobGroup11.isClosed());
+   assertTrue(jmJobGroup12.isClosed());
assertEquals(1, group.numRegisteredJobMetricGroups());
 
group.removeJob(jid2);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10907) Job recovery on the same JobManager causes JobManager metrics to report stale values

2018-11-16 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10907.

Resolution: Won't Fix

Not a problem in 1.5 and above. The JobManagerJobMetricGroup is closed when a 
{{JobMaster}} exits and is properly overwritten on recovery in 
{{JobManagerMetricGroup#addJob}}.

> Job recovery on the same JobManager causes JobManager metrics to report stale 
> values
> 
>
> Key: FLINK-10907
> URL: https://issues.apache.org/jira/browse/FLINK-10907
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Metrics
>Affects Versions: 1.4.2
> Environment: Verified the bug and the fix running on Flink 1.4
> Based on the JobManagerMetricGroup.java code in master, this issue should 
> still occur on Flink versions after 1.4.
>Reporter: Mark Cho
>Priority: Minor
>  Labels: pull-request-available
>
> https://github.com/apache/flink/pull/7119
>  * JobManager loses and regains leadership if it loses connection and 
> reconnects to ZooKeeper.
>  * When it regains the leadership, it tries to recover the job graph.
>  * During the recovery, it will try to reuse the existing 
> {{JobManagerMetricGroup}} to register new counters and gauges under the same 
> metric name, which causes the new counters and gauges to be registered 
> incorrectly.
>  * The old counters and gauges will continue to
>  report the stale values and the new counters and gauges will not report
>  the latest metric.
> Relevant lines from logs
> {code:java}
> com.---.JobManager - Submitting recovered job 
> e9e49fd9b8c61cf54b435f39aa49923f.
> com.---.JobManager - Submitting job e9e49fd9b8c61cf54b435f39aa49923f 
> (flink-job) (Recovery).
> com.---.JobManager - Running initialization on master for job flink-job 
> (e9e49fd9b8c61cf54b435f39aa49923f).
> com.---.JobManager - Successfully ran initialization on master in 0 ms.
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'totalNumberOfCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'numberOfFailedCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointDuration'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointExternalPath'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'restartingTime'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'downtime'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'uptime'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'fullRestarts'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'task_failures'. Metric will not be reported.[]
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10907) Job recovery on the same JobManager causes JobManager metrics to report stale values

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689414#comment-16689414
 ] 

ASF GitHub Bot commented on FLINK-10907:


zentol closed pull request #7119: [FLINK-10907] Fix Flink JobManager metrics 
from getting stuck after a job recovery.
URL: https://github.com/apache/flink/pull/7119
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index e09051d7160..f67b49d6745 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -61,16 +61,17 @@ public String hostname() {
public JobManagerJobMetricGroup addJob(JobGraph job) {
JobID jobId = job.getJobID();
String jobName = job.getName();
-   // get or create a jobs metric group
-   JobManagerJobMetricGroup currentJobGroup;
synchronized (this) {
if (!isClosed()) {
-   currentJobGroup = jobs.get(jobId);
+   JobManagerJobMetricGroup currentJobGroup = 
jobs.get(jobId);
 
-   if (currentJobGroup == null || 
currentJobGroup.isClosed()) {
-   currentJobGroup = new 
JobManagerJobMetricGroup(registry, this, jobId, jobName);
-   jobs.put(jobId, currentJobGroup);
+   if (currentJobGroup != null) {
+   currentJobGroup.close();
}
+
+   currentJobGroup = new 
JobManagerJobMetricGroup(registry, this, jobId, jobName);
+   jobs.put(jobId, currentJobGroup);
+
return currentJobGroup;
} else {
return null;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index cb5ec67c97c..146fb3b1f45 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -32,6 +32,7 @@
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -58,13 +59,15 @@ public void addAndRemoveJobs() throws Exception {
JobManagerJobMetricGroup jmJobGroup12 = group.addJob(new 
JobGraph(jid1, jobName1));
JobManagerJobMetricGroup jmJobGroup21 = group.addJob(new 
JobGraph(jid2, jobName2));
 
-   assertEquals(jmJobGroup11, jmJobGroup12);
+   assertNotEquals(jmJobGroup11, jmJobGroup12);
+   assertTrue(jmJobGroup11.isClosed());
+   assertTrue(!jmJobGroup12.isClosed());
 
assertEquals(2, group.numRegisteredJobMetricGroups());
 
group.removeJob(jid1);
 
-   assertTrue(jmJobGroup11.isClosed());
+   assertTrue(jmJobGroup12.isClosed());
assertEquals(1, group.numRegisteredJobMetricGroups());
 
group.removeJob(jid2);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job recovery on the same JobManager causes JobManager metrics to report stale 
> values
> 
>
> Key: FLINK-10907
> URL: https://issues.apache.org/jira/browse/FLINK-10907
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Metrics
>Affects Versions: 1.4.2
> Environment: Verified the bug and the fix running on Flink 1.4
> Based on the JobManagerMetricGroup.java code in master, this issue should 
> still occur on Flink versions after 1.4.
>Reporter: Mark Cho
>Priority: Minor
>  Labels: pull-request-available
>
> 

[GitHub] zentol commented on issue #7119: [FLINK-10907] Fix Flink JobManager metrics from getting stuck after a job recovery.

2018-11-16 Thread GitBox
zentol commented on issue #7119: [FLINK-10907] Fix Flink JobManager metrics 
from getting stuck after a job recovery.
URL: https://github.com/apache/flink/pull/7119#issuecomment-439391428
 
 
   Not a problem in 1.5 and above, see the JIRA for more details.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10907) Job recovery on the same JobManager causes JobManager metrics to report stale values

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689413#comment-16689413
 ] 

ASF GitHub Bot commented on FLINK-10907:


zentol commented on issue #7119: [FLINK-10907] Fix Flink JobManager metrics 
from getting stuck after a job recovery.
URL: https://github.com/apache/flink/pull/7119#issuecomment-439391428
 
 
   Not a problem in 1.5 and above, see the JIRA for more details.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job recovery on the same JobManager causes JobManager metrics to report stale 
> values
> 
>
> Key: FLINK-10907
> URL: https://issues.apache.org/jira/browse/FLINK-10907
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Metrics
>Affects Versions: 1.4.2
> Environment: Verified the bug and the fix running on Flink 1.4
> Based on the JobManagerMetricGroup.java code in master, this issue should 
> still occur on Flink versions after 1.4.
>Reporter: Mark Cho
>Priority: Minor
>  Labels: pull-request-available
>
> https://github.com/apache/flink/pull/7119
>  * JobManager loses and regains leadership if it loses connection and 
> reconnects to ZooKeeper.
>  * When it regains the leadership, it tries to recover the job graph.
>  * During the recovery, it will try to reuse the existing 
> {{JobManagerMetricGroup}} to register new counters and gauges under the same 
> metric name, which causes the new counters and gauges to be registered 
> incorrectly.
>  * The old counters and gauges will continue to
>  report the stale values and the new counters and gauges will not report
>  the latest metric.
> Relevant lines from logs
> {code:java}
> com.---.JobManager - Submitting recovered job 
> e9e49fd9b8c61cf54b435f39aa49923f.
> com.---.JobManager - Submitting job e9e49fd9b8c61cf54b435f39aa49923f 
> (flink-job) (Recovery).
> com.---.JobManager - Running initialization on master for job flink-job 
> (e9e49fd9b8c61cf54b435f39aa49923f).
> com.---.JobManager - Successfully ran initialization on master in 0 ms.
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'totalNumberOfCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'numberOfFailedCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointDuration'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointExternalPath'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'restartingTime'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'downtime'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'uptime'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'fullRestarts'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'task_failures'. Metric will not be reported.[]
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #7120: fix up RowSerializer.copy occur ClassCastException

2018-11-16 Thread GitBox
zentol commented on issue #7120: fix up RowSerializer.copy occur 
ClassCastException
URL: https://github.com/apache/flink/pull/7120#issuecomment-439391864
 
 
   Please open a JIRA before opening a PR, and include a description as to when 
this issue can occur. Additionally we require a test to reproduce the issue and 
to ensure this issue isn't re-introduced later on.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol closed pull request #7120: fix up RowSerializer.copy occur ClassCastException

2018-11-16 Thread GitBox
zentol closed pull request #7120: fix up RowSerializer.copy occur 
ClassCastException
URL: https://github.com/apache/flink/pull/7120
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index e29f6813254..5d3a9e50667 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -33,6 +33,8 @@
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.util.Arrays;
 import java.util.List;
 
@@ -92,7 +94,12 @@ public Row copy(Row from) {
Row result = new Row(len);
for (int i = 0; i < len; i++) {
Object fromField = from.getField(i);
-   if (fromField != null) {
+
+   String fromTypeName = 
fromField.getClass().getTypeName();
+   Type fieldSerializerType = ((ParameterizedType) 
fieldSerializers[i].getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+   String fieldSerializerTypeName = 
fieldSerializerType.getTypeName();
+
+   if (fromField != null && 
fieldSerializerTypeName.equals(fromTypeName)) {
Object copy = 
fieldSerializers[i].copy(fromField);
result.setField(i, copy);
} else {
@@ -120,12 +127,20 @@ public Row copy(Row from, Row reuse) {
Object fromField = from.getField(i);
if (fromField != null) {
Object reuseField = reuse.getField(i);
-   if (reuseField != null) {
+
+   String fromTypeName = 
fromField.getClass().getTypeName();
+   String reuseTypeName = 
reuseField.getClass().getTypeName();
+   Type fieldSerializerType = ((ParameterizedType) 
fieldSerializers[i].getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+   String fieldSerializerTypeName = 
fieldSerializerType.getTypeName();
+
+   if (reuseField != null && 
fieldSerializerTypeName.equals(fromTypeName) && 
fromTypeName.equals(reuseTypeName)) {
Object copy = 
fieldSerializers[i].copy(fromField, reuseField);
reuse.setField(i, copy);
-   } else {
+   } else if 
(fieldSerializerTypeName.equals(fromTypeName)){
Object copy = 
fieldSerializers[i].copy(fromField);
reuse.setField(i, copy);
+   } else {
+   reuse.setField(i, null);
}
} else {
reuse.setField(i, null);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wujinhu opened a new pull request #7123: [FLINK-10865] Add Aliyun OSS file systems without Hadoop dependencies

2018-11-16 Thread GitBox
wujinhu opened a new pull request #7123: [FLINK-10865] Add Aliyun OSS file 
systems without Hadoop dependencies
URL: https://github.com/apache/flink/pull/7123
 
 
   
   ## What is the purpose of the change
   
   This PR will add an implementation of a file system that read from & write 
to Aliyun OSS so that users can use OSS with Flink without depending on Hadoop. 
In this way, users will find it is more easily to use OSS with Flink now.
   
   This implementation wraps **AliyunOSSFileSystem** and shade its 
dependencies. However, the wrapped jar is not in Flink's lib directory,  users 
need to copy the jar which built in **opt** directory to **lib** directory.
   
   
   ## Brief change log
   
   - Adds **flink-filesystems/flink-oss-fs-hadoop**
   
   
   ## Verifying this change
   
   This implementation adds some tests to test instantiation and some reads & 
writes & lists operations(test communications with Aliyun OSS). However, in 
order to run these tests, someone need to have his own Aliyun access key id and 
access key secret. Then, set environment variables below:
   `export ARTIFACTS_OSS_ENDPOINT=`
   `export ARTIFACTS_OSS_BUCKET=`
   `export ARTIFACTS_OSS_ACCESS_KEY=`
   `export ARTIFACTS_OSS_SECRET_KEY=`
   
   These tests are skipped by default.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**yes** / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10865) Implement Flink's own Aliyun OSS filesystem

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10865:
---
Labels: pull-request-available  (was: )

> Implement Flink's own Aliyun OSS filesystem
> ---
>
> Key: FLINK-10865
> URL: https://issues.apache.org/jira/browse/FLINK-10865
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Affects Versions: 1.6.2
>Reporter: wujinhu
>Priority: Major
>  Labels: pull-request-available
>
> Aliyun OSS is widely used among China’s cloud users, and Hadoop supports 
> Aliyun OSS since 2.9.1. 
> Open this jira to wrap AliyunOSSFileSystem in flink(similar to s3 support), 
> so that user can read from & write to OSS more easily in flink. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10865) Implement Flink's own Aliyun OSS filesystem

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689419#comment-16689419
 ] 

ASF GitHub Bot commented on FLINK-10865:


wujinhu opened a new pull request #7123: [FLINK-10865] Add Aliyun OSS file 
systems without Hadoop dependencies
URL: https://github.com/apache/flink/pull/7123
 
 
   
   ## What is the purpose of the change
   
   This PR will add an implementation of a file system that read from & write 
to Aliyun OSS so that users can use OSS with Flink without depending on Hadoop. 
In this way, users will find it is more easily to use OSS with Flink now.
   
   This implementation wraps **AliyunOSSFileSystem** and shade its 
dependencies. However, the wrapped jar is not in Flink's lib directory,  users 
need to copy the jar which built in **opt** directory to **lib** directory.
   
   
   ## Brief change log
   
   - Adds **flink-filesystems/flink-oss-fs-hadoop**
   
   
   ## Verifying this change
   
   This implementation adds some tests to test instantiation and some reads & 
writes & lists operations(test communications with Aliyun OSS). However, in 
order to run these tests, someone need to have his own Aliyun access key id and 
access key secret. Then, set environment variables below:
   `export ARTIFACTS_OSS_ENDPOINT=`
   `export ARTIFACTS_OSS_BUCKET=`
   `export ARTIFACTS_OSS_ACCESS_KEY=`
   `export ARTIFACTS_OSS_SECRET_KEY=`
   
   These tests are skipped by default.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**yes** / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement Flink's own Aliyun OSS filesystem
> ---
>
> Key: FLINK-10865
> URL: https://issues.apache.org/jira/browse/FLINK-10865
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Affects Versions: 1.6.2
>Reporter: wujinhu
>Priority: Major
>  Labels: pull-request-available
>
> Aliyun OSS is widely used among China’s cloud users, and Hadoop supports 
> Aliyun OSS since 2.9.1. 
> Open this jira to wrap AliyunOSSFileSystem in flink(similar to s3 support), 
> so that user can read from & write to OSS more easily in flink. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions

2018-11-16 Thread xymaqingxiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xymaqingxiang updated FLINK-10908:
--
Description: Add yarn.application.priority in YarnConfigOptions, Changes 
the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, 
LOW, VERY_LOW.  (was:  

Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, 
NORMAL, LOW, VERY_LOW)

> Add yarn.application.priority in YarnConfigOptions
> --
>
> Key: FLINK-10908
> URL: https://issues.apache.org/jira/browse/FLINK-10908
> Project: Flink
>  Issue Type: Improvement
>Reporter: xymaqingxiang
>Priority: Major
>  Labels: pull-request-available
>
> Add yarn.application.priority in YarnConfigOptions, Changes the priority of 
> the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689246#comment-16689246
 ] 

ASF GitHub Bot commented on FLINK-10908:


maqingxiang opened a new pull request #7122: [FLINK-10908][flink-yarn]Add 
yarn.application.priority in YarnConfigOptions
URL: https://github.com/apache/flink/pull/7122
 
 
   ## What is the purpose of the change
   
   
   Add the priority of the Flink YARN application.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add yarn.application.priority in YarnConfigOptions
> --
>
> Key: FLINK-10908
> URL: https://issues.apache.org/jira/browse/FLINK-10908
> Project: Flink
>  Issue Type: Improvement
>Reporter: xymaqingxiang
>Priority: Major
>  Labels: pull-request-available
>
> Add yarn.application.priority in YarnConfigOptions, Changes the priority of 
> the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] maqingxiang opened a new pull request #7122: [FLINK-10908][flink-yarn]Add yarn.application.priority in YarnConfigOptions

2018-11-16 Thread GitBox
maqingxiang opened a new pull request #7122: [FLINK-10908][flink-yarn]Add 
yarn.application.priority in YarnConfigOptions
URL: https://github.com/apache/flink/pull/7122
 
 
   ## What is the purpose of the change
   
   
   Add the priority of the Flink YARN application.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10908:
---
Labels: pull-request-available  (was: )

> Add yarn.application.priority in YarnConfigOptions
> --
>
> Key: FLINK-10908
> URL: https://issues.apache.org/jira/browse/FLINK-10908
> Project: Flink
>  Issue Type: Improvement
>Reporter: xymaqingxiang
>Priority: Major
>  Labels: pull-request-available
>
> Add yarn.application.priority in YarnConfigOptions, Changes the priority of 
> the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions

2018-11-16 Thread xymaqingxiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xymaqingxiang updated FLINK-10908:
--
Description: 
 

Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, 
NORMAL, LOW, VERY_LOW

> Add yarn.application.priority in YarnConfigOptions
> --
>
> Key: FLINK-10908
> URL: https://issues.apache.org/jira/browse/FLINK-10908
> Project: Flink
>  Issue Type: Improvement
>Reporter: xymaqingxiang
>Priority: Major
>
>  
> Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, 
> NORMAL, LOW, VERY_LOW



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10909) Possible RocksDBStateBackend performance regression with rocksdbjni-5.7.5

2018-11-16 Thread Mohamed Amine ABDESSEMED (JIRA)
Mohamed Amine ABDESSEMED created FLINK-10909:


 Summary: Possible RocksDBStateBackend performance regression with 
rocksdbjni-5.7.5
 Key: FLINK-10909
 URL: https://issues.apache.org/jira/browse/FLINK-10909
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.6.2
Reporter: Mohamed Amine ABDESSEMED


Hello,

We suspect RocksDBStateBackend performance regression after upgrading to 
flink-1.6.1(rocksdbjni-5.7.5) 
similar to https://issues.apache.org/jira/browse/FLINK-5756 and 
[https://github.com/facebook/rocksdb/issues/1988] 

Here are the test results using the test code provided by [~StephanEwen] in  
[https://github.com/facebook/rocksdb/issues/1988] 

Test results with rocksdbjni-5.7.5:

begin insert
end insert - duration: *54 ms*
end get - duration: *37247 ms*


Test results with frocksdbjni-4.11.2-artisans:

begin insert
end insert - duration: *53 ms*
end get - duration: *4 ms*

Can anyone confirm ?

Regards,
Amine



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10909) Possible RocksDBStateBackend performance regression with rocksdbjni-5.7.5

2018-11-16 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689332#comment-16689332
 ] 

Stefan Richter commented on FLINK-10909:


The test you mentioned is part of the flink test suite, specifically it became 
{{RocksDBPerformanceTest#testRocksDbMergePerformance}}. This test is passing 
and there is no sign of performance regression in my opinion.

> Possible RocksDBStateBackend performance regression with rocksdbjni-5.7.5
> -
>
> Key: FLINK-10909
> URL: https://issues.apache.org/jira/browse/FLINK-10909
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2
>Reporter: Mohamed Amine ABDESSEMED
>Priority: Major
>
> Hello,
> We suspect RocksDBStateBackend performance regression after upgrading to 
> flink-1.6.1(rocksdbjni-5.7.5) 
> similar to https://issues.apache.org/jira/browse/FLINK-5756 and 
> [https://github.com/facebook/rocksdb/issues/1988] 
> Here are the test results using the test code provided by [~StephanEwen] in  
> [https://github.com/facebook/rocksdb/issues/1988] 
> Test results with rocksdbjni-5.7.5:
> begin insert
> end insert - duration: *54 ms*
> end get - duration: *37247 ms*
> Test results with frocksdbjni-4.11.2-artisans:
> begin insert
> end insert - duration: *53 ms*
> end get - duration: *4 ms*
> Can anyone confirm ?
> Regards,
> Amine



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6900: [FLINK-10642][table] fix CodeGen split fields errors in special config

2018-11-16 Thread GitBox
asfgit closed pull request #6900: [FLINK-10642][table] fix CodeGen split fields 
errors in special config
URL: https://github.com/apache/flink/pull/6900
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 19e030b4c0a..8f9913bdfaa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -1057,13 +1057,13 @@ abstract class CodeGenerator(
 
 // declaration
 val resultTypeTerm = primitiveTypeTermForTypeInfo(expr.resultType)
-if (nullCheck) {
+if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && 
!expr.nullTerm.equals(ALWAYS_NULL)) {
   reusableMemberStatements.add(s"private boolean ${expr.nullTerm};")
 }
 reusableMemberStatements.add(s"private $resultTypeTerm 
${expr.resultTerm};")
 
 // assignment
-if (nullCheck) {
+if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && 
!expr.nullTerm.equals(ALWAYS_NULL)) {
   reusablePerRecordStatements.add(s"this.${expr.nullTerm} = 
${expr.nullTerm};")
 }
 reusablePerRecordStatements.add(s"this.${expr.resultTerm} = 
${expr.resultTerm};")
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index b161eeda43c..4e880eeaf3b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -578,6 +578,22 @@ class CalcITCase(
 val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n")
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testSplitFeildsOnCustomType(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)  // splits fields
+
+val ds = CollectionDataSets.getCustomTypeDataSet(env)
+val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 
's)
+  .filter( 's.like("%a%") && 's.charLength > 12)
+  .select('i, 'l, 's.charLength)
+
+val expected = "3,3,25\n" + "3,5,14\n"
+val results = filterDs.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }
 
 object CalcITCase {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10642) CodeGen split fields errors when maxGeneratedCodeLength equals 1

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689453#comment-16689453
 ] 

ASF GitHub Bot commented on FLINK-10642:


asfgit closed pull request #6900: [FLINK-10642][table] fix CodeGen split fields 
errors in special config
URL: https://github.com/apache/flink/pull/6900
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 19e030b4c0a..8f9913bdfaa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -1057,13 +1057,13 @@ abstract class CodeGenerator(
 
 // declaration
 val resultTypeTerm = primitiveTypeTermForTypeInfo(expr.resultType)
-if (nullCheck) {
+if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && 
!expr.nullTerm.equals(ALWAYS_NULL)) {
   reusableMemberStatements.add(s"private boolean ${expr.nullTerm};")
 }
 reusableMemberStatements.add(s"private $resultTypeTerm 
${expr.resultTerm};")
 
 // assignment
-if (nullCheck) {
+if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && 
!expr.nullTerm.equals(ALWAYS_NULL)) {
   reusablePerRecordStatements.add(s"this.${expr.nullTerm} = 
${expr.nullTerm};")
 }
 reusablePerRecordStatements.add(s"this.${expr.resultTerm} = 
${expr.resultTerm};")
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index b161eeda43c..4e880eeaf3b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -578,6 +578,22 @@ class CalcITCase(
 val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n")
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testSplitFeildsOnCustomType(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)  // splits fields
+
+val ds = CollectionDataSets.getCustomTypeDataSet(env)
+val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 
's)
+  .filter( 's.like("%a%") && 's.charLength > 12)
+  .select('i, 'l, 's.charLength)
+
+val expected = "3,3,25\n" + "3,5,14\n"
+val results = filterDs.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }
 
 object CalcITCase {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> CodeGen split fields errors when maxGeneratedCodeLength equals 1
> 
>
> Key: FLINK-10642
> URL: https://issues.apache.org/jira/browse/FLINK-10642
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: xueyu
>Assignee: xueyu
>Priority: Major
>  Labels: pull-request-available
>
> Several tests error in special config when setting maxGeneratedCodeLength 1. 
> e.g.
>   CalcITCase.testFilterOnCustomType:260 ? InvalidProgram Table program cannot 
> be...
>   JavaTableEnvironmentITCase.testAsFromAndToPojo:394 ? InvalidProgram Table 
> prog...
>   JavaTableEnvironmentITCase.testAsFromAndToPrivateFieldPojo:421 ? 
> InvalidProgram
>   JavaTableEnvironmentITCase.testAsFromPojo:288 ? InvalidProgram Table 
> program c...
>   JavaTableEnvironmentITCase.testAsFromPrivateFieldsPojo:366 ? InvalidProgram 
> Ta...
>   JavaTableEnvironmentITCase.testAsWithPojoAndGenericTypes:453 ? 
> InvalidProgram ...
>   TimeAttributesITCase.testPojoSupport:566 ? JobExecution Job execution 
> failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10642) CodeGen split fields errors when maxGeneratedCodeLength equals 1

2018-11-16 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-10642.
--
   Resolution: Fixed
Fix Version/s: 1.7.0
   1.6.3
   1.5.6

Fixed in master: a7ae4253e4463ff6242f69d8fdb1c4a76af0f14c
Fixed in 1.7.0: 146f4340ed02cd2337c42b1480fd9c8b070997ad
Fixed in 1.6.3: e8fc37e8aab27b5ee468f444d4c9f13927552166
FIxed in 1.5.6: 1d8bc86d152cde2941750c99f6cfd2507c96afdb

> CodeGen split fields errors when maxGeneratedCodeLength equals 1
> 
>
> Key: FLINK-10642
> URL: https://issues.apache.org/jira/browse/FLINK-10642
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: xueyu
>Assignee: xueyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Several tests error in special config when setting maxGeneratedCodeLength 1. 
> e.g.
>   CalcITCase.testFilterOnCustomType:260 ? InvalidProgram Table program cannot 
> be...
>   JavaTableEnvironmentITCase.testAsFromAndToPojo:394 ? InvalidProgram Table 
> prog...
>   JavaTableEnvironmentITCase.testAsFromAndToPrivateFieldPojo:421 ? 
> InvalidProgram
>   JavaTableEnvironmentITCase.testAsFromPojo:288 ? InvalidProgram Table 
> program c...
>   JavaTableEnvironmentITCase.testAsFromPrivateFieldsPojo:366 ? InvalidProgram 
> Ta...
>   JavaTableEnvironmentITCase.testAsWithPojoAndGenericTypes:453 ? 
> InvalidProgram ...
>   TimeAttributesITCase.testPojoSupport:566 ? JobExecution Job execution 
> failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr closed pull request #3830: [hotfix] [table] Optimize aggregate function get type

2018-11-16 Thread GitBox
twalthr closed pull request #3830: [hotfix] [table] Optimize aggregate function 
get type
URL: https://github.com/apache/flink/pull/3830
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr closed pull request #3569: [FLINK-6036] [table] Let catalog support partition

2018-11-16 Thread GitBox
twalthr closed pull request #3569: [FLINK-6036] [table] Let catalog support 
partition
URL: https://github.com/apache/flink/pull/3569
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 760cf7588f4..1b07fd8998d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.api
 
+import com.google.common.base.Joiner
 import org.apache.flink.table.catalog.TableSourceConverter
+import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec
 
 /**
   * Exception for all errors occurring during expression parsing.
@@ -74,6 +76,50 @@ object ValidationException {
   */
 case class UnresolvedException(msg: String) extends RuntimeException(msg)
 
+/**
+  * Exception for an operation on a nonexistent partition
+  *
+  * @param dbdatabase name
+  * @param table table name
+  * @param partitionSpec partition spec
+  * @param cause the cause
+  */
+case class PartitionNotExistException(
+db: String,
+table: String,
+partitionSpec: PartitionSpec,
+cause: Throwable)
+extends RuntimeException(
+  s"Partition 
[${Joiner.on(",").withKeyValueSeparator("=").join(partitionSpec)}] " +
+  s"does not exist in table $db.$table!", cause) {
+
+  def this(db: String, table: String, partitionSpec: PartitionSpec) =
+this(db, table, partitionSpec, null)
+
+}
+
+/**
+  * Exception for adding an already existent partition
+  *
+  * @param dbdatabase name
+  * @param table table name
+  * @param partitionSpec partition spec
+  * @param cause the cause
+  */
+case class PartitionAlreadyExistException(
+db: String,
+table: String,
+partitionSpec: PartitionSpec,
+cause: Throwable)
+extends RuntimeException(
+  s"Partition 
[${Joiner.on(",").withKeyValueSeparator("=").join(partitionSpec)}] " +
+  s"already exists in table $db.$table!", cause) {
+
+  def this(db: String, table: String, partitionSpec: PartitionSpec) =
+this(db, table, partitionSpec, null)
+
+}
+
 /**
   * Exception for an operation on a nonexistent table
   *
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
index fcefa45fcc5..78798ce0b72 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
@@ -19,12 +19,81 @@
 package org.apache.flink.table.catalog
 
 import org.apache.flink.table.api._
+import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec
 
 /**
   * The CrudExternalCatalog provides methods to create, drop, and alter 
databases or tables.
   */
 trait CrudExternalCatalog extends ExternalCatalog {
 
+  /**
+* Adds a partition to the catalog.
+*
+* @param dbName The name of the table's database.
+* @param tableName  The name of the table.
+* @param part   Description of the partition to add.
+* @param ignoreIfExists Flag to specify behavior if a partition with the 
given spec
+*   already exists:
+*   if set to false, it throws a 
PartitionAlreadyExistException,
+*   if set to true, nothing happens.
+* @throws DatabaseNotExistException  thrown if the database does not 
exist in the catalog.
+* @throws TableNotExistException thrown if the table does not 
exist in the catalog.
+* @throws PartitionAlreadyExistException thrown if the partition already 
exists and
+*ignoreIfExists is false
+*/
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  @throws[PartitionAlreadyExistException]
+  def createPartition(
+  dbName: String,
+  tableName: String,
+  part: ExternalCatalogTablePartition,
+  ignoreIfExists: Boolean): Unit
+
+  /**
+* Deletes partition from a database of the catalog.
+*
+* @param dbNameThe name of the table's database.
+* @param tableName The name of the table.
+* @param partSpec  Description of the partition to add.
+* @param ignoreIfNotExists Flag to specify behavior if the partition does 
not exist:
+*  

[GitHub] tzulitai opened a new pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution

2018-11-16 Thread GitBox
tzulitai opened a new pull request #7124: [FLINK-9574] [doc]  Rework 
documentation for custom state serializers and state evolution
URL: https://github.com/apache/flink/pull/7124
 
 
   ## What is the purpose of the change
   
   This PR adds a new "State Schema Evolution" page under "State & Fault 
Tolerance".
   It also reworks the "Custom State Serialization" page to reflect the new 
serializer / serializer snapshot abstractions in 1.7.
   
   - The "State Schema Evolution" page is intended for the majority of users 
who do not use custom serializers, and just care about what state types they 
should use if they care about evolvable schema, and their limitations. The list 
of supported types only includes Avro now, because we only support Avro schema 
evolution in 1.7.
   - The "Custom State Serialization" page is intended for power users who 
implement their own state serializer. It explains the abstractions and how 
Flink interacts with them. The document is also targeted for Flink developers 
who might implement Flink-shipped serializers.
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-6036) Let catalog support partition

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-6036:
--
Labels: pull-request-available  (was: )

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: jingzhang
>Assignee: jingzhang
>Priority: Major
>  Labels: pull-request-available
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations at 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9574) Add a dedicated documentation page for state evolution

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9574:
--
Labels: pull-request-available  (was: )

> Add a dedicated documentation page for state evolution
> --
>
> Key: FLINK-9574
> URL: https://issues.apache.org/jira/browse/FLINK-9574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, State Backends, Checkpointing, Type 
> Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, the only bit of documentation about serializer upgrades / state 
> evolution, is 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.],
>  which only explains things at an API level.
> State evolution over the time has proved to be a rather complex topic that is 
> often overlooked by users. Users would probably benefit from a actual 
> full-grown dedicated page that covers both API, some necessary internal 
> details regarding interplay of state serializers, best practices, and caution 
> notices.
> I propose to add this documentation as a subpage under Streaming/State & 
> Fault-Tolerance/.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689502#comment-16689502
 ] 

ASF GitHub Bot commented on FLINK-9574:
---

tzulitai opened a new pull request #7124: [FLINK-9574] [doc]  Rework 
documentation for custom state serializers and state evolution
URL: https://github.com/apache/flink/pull/7124
 
 
   ## What is the purpose of the change
   
   This PR adds a new "State Schema Evolution" page under "State & Fault 
Tolerance".
   It also reworks the "Custom State Serialization" page to reflect the new 
serializer / serializer snapshot abstractions in 1.7.
   
   - The "State Schema Evolution" page is intended for the majority of users 
who do not use custom serializers, and just care about what state types they 
should use if they care about evolvable schema, and their limitations. The list 
of supported types only includes Avro now, because we only support Avro schema 
evolution in 1.7.
   - The "Custom State Serialization" page is intended for power users who 
implement their own state serializer. It explains the abstractions and how 
Flink interacts with them. The document is also targeted for Flink developers 
who might implement Flink-shipped serializers.
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a dedicated documentation page for state evolution
> --
>
> Key: FLINK-9574
> URL: https://issues.apache.org/jira/browse/FLINK-9574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, State Backends, Checkpointing, Type 
> Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, the only bit of documentation about serializer upgrades / state 
> evolution, is 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.],
>  which only explains things at an API level.
> State evolution over the time has proved to be a rather complex topic that is 
> often overlooked by users. Users would probably benefit from a actual 
> full-grown dedicated page that covers both API, some necessary internal 
> details regarding interplay of state serializers, best practices, and caution 
> notices.
> I propose to add this documentation as a subpage under Streaming/State & 
> Fault-Tolerance/.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6457) Clean up ScalarFunction and TableFunction interface

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689515#comment-16689515
 ] 

ASF GitHub Bot commented on FLINK-6457:
---

twalthr closed pull request #3880: [FLINK-6457] [table] Clean up ScalarFunction 
and TableFunction interface
URL: https://github.com/apache/flink/pull/3880
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 9f50f0c806a..1aab7ae0993 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -68,6 +68,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable.HashMap
 import _root_.scala.annotation.varargs
+import _root_.scala.util.{Try, Success, Failure}
 
 /**
   * The abstract base class for batch and stream TableEnvironments.
@@ -340,10 +341,11 @@ abstract class TableEnvironment(val config: TableConfig) {
 // check if class could be instantiated
 checkForInstantiation(function.getClass)
 
-val typeInfo: TypeInformation[_] = if (function.getResultType != null) {
-  function.getResultType
-} else {
-  implicitly[TypeInformation[T]]
+val typeInfo: TypeInformation[_] = Try {
+  function.getClass.getDeclaredMethod("getResultType")
+} match {
+  case Success(m) => m.invoke(function).asInstanceOf[TypeInformation[_]]
+  case Failure(_) => implicitly[TypeInformation[T]]
 }
 
 // register in Table API
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala
new file mode 100644
index 000..d781e76071d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.table.expressions.{Expression, ScalarFunctionCall}
+import org.apache.flink.table.functions.ScalarFunction
+
+/**
+  * Holds methods to convert a [[ScalarFunction]] call
+  * in the Scala Table API into a [[ScalarFunctionCall]].
+  *
+  * @param sf The ScalarFunction to convert.
+  */
+class ScalarFunctionConversions(sf: ScalarFunction) {
+  /**
+* Creates a [[ScalarFunctionCall]] in Scala Table API.
+*
+* @param params actual parameters of function
+* @return [[Expression]] in form of a [[ScalarFunctionCall]]
+*/
+  final def apply(params: Expression*): Expression = {
+ScalarFunctionCall(sf, params)
+  }
+}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
index 692876fe2da..292e241077a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
@@ -24,6 +24,8 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
 
+import scala.util.{Failure, Success, Try}
+
 /**
   * Holds methods to convert a [[TableFunction]] call in the Scala Table API 
into a [[Table]].
   *
@@ -39,7 +41,12 @@ class TableFunctionConversions[T](tf: TableFunction[T]) {
 */
   final def apply(args: 

[jira] [Commented] (FLINK-10900) Mark Kafka 2.0 connector as beta feature

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689568#comment-16689568
 ] 

ASF GitHub Bot commented on FLINK-10900:


pnowojski commented on a change in pull request #7109: 
[FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
URL: https://github.com/apache/flink/pull/7109#discussion_r234248328
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not 
part of the binary distribu
 
 ## Kafka 1.0.0+ Connector
 
-Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version. Rather, it tracks the latest version of Kafka at 
the time of the Flink release.
+Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version.
+Rather, it tracks the latest version of Kafka at the time of the Flink release.
 
-If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you 
should use the connector corresponding to the broker version.
+If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector.
+If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use 
the connector corresponding to the broker version.
 
 ### Compatibility
 
-The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker. The 
modern Kafka connector is compatible with broker versions 0.11.0 or later, 
depending on the features used. For details on Kafka compatibility, please 
refer to the [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
+The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker.
 
 Review comment:
   I was just braking new lines here, but I can fix this while I'm at it. 
Thanks for pointing out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mark Kafka 2.0 connector as beta feature
> 
>
> Key: FLINK-10900
> URL: https://issues.apache.org/jira/browse/FLINK-10900
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Given the test problems with the Kafka 2.0 connector we should mark this 
> connector as a beta feature until we have fully understood why so many tests 
> deadlock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10900) Mark Kafka 2.0 connector as beta feature

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689569#comment-16689569
 ] 

ASF GitHub Bot commented on FLINK-10900:


pnowojski commented on a change in pull request #7109: 
[FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
URL: https://github.com/apache/flink/pull/7109#discussion_r234248328
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not 
part of the binary distribu
 
 ## Kafka 1.0.0+ Connector
 
-Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version. Rather, it tracks the latest version of Kafka at 
the time of the Flink release.
+Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version.
+Rather, it tracks the latest version of Kafka at the time of the Flink release.
 
-If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you 
should use the connector corresponding to the broker version.
+If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector.
+If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use 
the connector corresponding to the broker version.
 
 ### Compatibility
 
-The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker. The 
modern Kafka connector is compatible with broker versions 0.11.0 or later, 
depending on the features used. For details on Kafka compatibility, please 
refer to the [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
+The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker.
 
 Review comment:
   I was just brakinglines here, but I can fix this while I'm at it. Thanks for 
pointing out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mark Kafka 2.0 connector as beta feature
> 
>
> Key: FLINK-10900
> URL: https://issues.apache.org/jira/browse/FLINK-10900
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Given the test problems with the Kafka 2.0 connector we should mark this 
> connector as a beta feature until we have fully understood why so many tests 
> deadlock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible

2018-11-16 Thread GitBox
twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table 
factory versioning more flexible
URL: https://github.com/apache/flink/pull/7087#issuecomment-439437076
 
 
   Thanks @pnowojski. Will merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689586#comment-16689586
 ] 

ASF GitHub Bot commented on FLINK-10843:


twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table 
factory versioning more flexible
URL: https://github.com/apache/flink/pull/7087#issuecomment-439437076
 
 
   Thanks @pnowojski. Will merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make Kafka version definition more flexible for new Kafka table factory
> ---
>
> Key: FLINK-10843
> URL: https://issues.apache.org/jira/browse/FLINK-10843
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, a user has to specify a specific version for a Kafka connector 
> like:
> {code}
> connector:
>   type: kafka
>   version: "0.11" # required: valid connector versions are "0.8", "0.9", 
> "0.10", and "0.11"
>   topic: ...  # required: topic name from which the table is read
> {code}
> However, the new Kafka connector aims to be universal, thus, at least for 1.x 
> and 2.x versions which we should support those as parameters as well. 
> Currently, {{2.0}} is the only accepted string for the factory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10880) Failover strategies should not be applied to Batch Execution

2018-11-16 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689606#comment-16689606
 ] 

TisonKun commented on FLINK-10880:
--

[~till.rohrmann] alright I agree your quick fix and hope we can meet the 
release asap :-)

> Failover strategies should not be applied to Batch Execution
> 
>
> Key: FLINK-10880
> URL: https://issues.apache.org/jira/browse/FLINK-10880
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.6.2
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.6.3, 1.7.0
>
>
> When configuring a failover strategy other than "full", DataSet/Batch 
> execution is currently not correct.
> This is expected, the failover region strategy is an experimental WIP feature 
> for streaming that has not been extended to the DataSet API.
> We need to document this and prevent execution of DataSet features with other 
> failover strategies than "full".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10872) Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689607#comment-16689607
 ] 

ASF GitHub Bot commented on FLINK-10872:


asfgit closed pull request #7100: [FLINK-10872] Extend SQL client end-to-end to 
test KafkaTableSink for kafka connector 0.11
URL: https://github.com/apache/flink/pull/7100
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml 
b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index 6b02aba4bb3..db1fca6df49 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -191,14 +191,13 @@ under the License.

sql-jar

jar

-   
+   


org.apache.flink

flink-connector-elasticsearch6_${scala.binary.version}
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 6d4560864f8..9d51d03c7e4 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -143,6 +143,7 @@ run_test "State TTL RocksDb backend end-to-end test" 
"$END_TO_END_DIR/test-scrip
 
 run_test "SQL Client end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_sql_client.sh"
 run_test "SQL Client end-to-end test for Kafka 0.10" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
+run_test "SQL Client end-to-end test for Kafka 0.11" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"
 run_test "SQL Client end-to-end test for modern Kafka" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"
 
 run_test "Heavy deployment end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh"
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh
new file mode 100755
index 000..8d3af0f7925
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -Eeuo pipefail
+
+source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.11 0.11.0.2 3.2.0 
3.2 "kafka-0.11"


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
> 
>
> Key: FLINK-10872
> URL: https://issues.apache.org/jira/browse/FLINK-10872
> Project: Flink
>  Issue Type: Test
>  Components: E2E Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #7100: [FLINK-10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11

2018-11-16 Thread GitBox
asfgit closed pull request #7100: [FLINK-10872] Extend SQL client end-to-end to 
test KafkaTableSink for kafka connector 0.11
URL: https://github.com/apache/flink/pull/7100
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml 
b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index 6b02aba4bb3..db1fca6df49 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -191,14 +191,13 @@ under the License.

sql-jar

jar

-   
+   


org.apache.flink

flink-connector-elasticsearch6_${scala.binary.version}
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 6d4560864f8..9d51d03c7e4 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -143,6 +143,7 @@ run_test "State TTL RocksDb backend end-to-end test" 
"$END_TO_END_DIR/test-scrip
 
 run_test "SQL Client end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_sql_client.sh"
 run_test "SQL Client end-to-end test for Kafka 0.10" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
+run_test "SQL Client end-to-end test for Kafka 0.11" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"
 run_test "SQL Client end-to-end test for modern Kafka" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"
 
 run_test "Heavy deployment end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh"
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh
new file mode 100755
index 000..8d3af0f7925
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -Eeuo pipefail
+
+source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.11 0.11.0.2 3.2.0 
3.2 "kafka-0.11"


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689665#comment-16689665
 ] 

ASF GitHub Bot commented on FLINK-10455:


pnowojski commented on issue #7108: [FLINK-10455][Kafka Tx] Close transactional 
producers in case of failure and termination
URL: https://github.com/apache/flink/pull/7108#issuecomment-439457861
 
 
   Thanks @azagrebin merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #7108: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination

2018-11-16 Thread GitBox
pnowojski commented on issue #7108: [FLINK-10455][Kafka Tx] Close transactional 
producers in case of failure and termination
URL: https://github.com/apache/flink/pull/7108#issuecomment-439457861
 
 
   Thanks @azagrebin merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689664#comment-16689664
 ] 

ASF GitHub Bot commented on FLINK-10455:


pnowojski commented on issue #7107: [FLINK-10455][Kafka Tx] Close transactional 
producers in case of failure and termination
URL: https://github.com/apache/flink/pull/7107#issuecomment-439457778
 
 
   Thanks @azagrebin, merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #7129: [FLINK-10880] Exclude JobManagerOptions#EXECUTION_FAILOVER_STRATEGY from documentation

2018-11-16 Thread GitBox
tillrohrmann commented on a change in pull request #7129: [FLINK-10880] Exclude 
JobManagerOptions#EXECUTION_FAILOVER_STRATEGY from documentation
URL: https://github.com/apache/flink/pull/7129#discussion_r234281426
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 ##
 @@ -106,6 +106,7 @@
/**
 * This option specifies the failover strategy, i.e. how the job 
computation recovers from task failures.
 */
+   @Documentation.ExcludeFromDocumentation
 
 Review comment:
   Good idea. Will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Reopened] (FLINK-10907) Job recovery on the same JobManager causes JobManager metrics to report stale values

2018-11-16 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-10907:
--

The issue may affect the legacy mode in 1.5/1.6 .

> Job recovery on the same JobManager causes JobManager metrics to report stale 
> values
> 
>
> Key: FLINK-10907
> URL: https://issues.apache.org/jira/browse/FLINK-10907
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Metrics
>Affects Versions: 1.4.2
> Environment: Verified the bug and the fix running on Flink 1.4
> Based on the JobManagerMetricGroup.java code in master, this issue should 
> still occur on Flink versions after 1.4.
>Reporter: Mark Cho
>Priority: Minor
>  Labels: pull-request-available
>
> https://github.com/apache/flink/pull/7119
>  * JobManager loses and regains leadership if it loses connection and 
> reconnects to ZooKeeper.
>  * When it regains the leadership, it tries to recover the job graph.
>  * During the recovery, it will try to reuse the existing 
> {{JobManagerMetricGroup}} to register new counters and gauges under the same 
> metric name, which causes the new counters and gauges to be registered 
> incorrectly.
>  * The old counters and gauges will continue to
>  report the stale values and the new counters and gauges will not report
>  the latest metric.
> Relevant lines from logs
> {code:java}
> com.---.JobManager - Submitting recovered job 
> e9e49fd9b8c61cf54b435f39aa49923f.
> com.---.JobManager - Submitting job e9e49fd9b8c61cf54b435f39aa49923f 
> (flink-job) (Recovery).
> com.---.JobManager - Running initialization on master for job flink-job 
> (e9e49fd9b8c61cf54b435f39aa49923f).
> com.---.JobManager - Successfully ran initialization on master in 0 ms.
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'totalNumberOfCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'numberOfFailedCheckpoints'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointDuration'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'lastCheckpointExternalPath'. Metric will not be 
> reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'restartingTime'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'downtime'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'uptime'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'fullRestarts'. Metric will not be reported.[]
> org.apache.flink.metrics.MetricGroup - Name collision: Group already contains 
> a Metric with the name 'task_failures'. Metric will not be reported.[]
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10869) Update S3 testing settings

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689517#comment-16689517
 ] 

ASF GitHub Bot commented on FLINK-10869:


zentol commented on a change in pull request #7098: [FLINK-10869] [build] 
Update S3 tests to reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#discussion_r234231999
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_s3.sh
 ##
 @@ -68,8 +68,8 @@ function s3_setup {
   trap s3_cleanup EXIT
 
   cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/
-  echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> 
"$FLINK_DIR/conf/flink-conf.yaml"
 
 Review comment:
   let's either change these to use the variables defined at the top 
(`AWS_REGION`, `AWS_ACCESS_KEY`, `AWS_SECRET_KEY`), or remove said variables 
since they are unused.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update S3 testing settings
> --
>
> Key: FLINK-10869
> URL: https://issues.apache.org/jira/browse/FLINK-10869
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently S3 tests go against a bucket hosted by 'data Artisans'.
> As part of reworking the AWS permission setup, we need to adapt the 
> credentials and buckets for these tests.
> Future tests should refer to the following environment variables for S3 tests:
> * `IT_CASE_S3_BUCKET`
> * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA`
> * `IT_CASE_S3_SECRET_KEY`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.

2018-11-16 Thread GitBox
zentol commented on a change in pull request #7098: [FLINK-10869] [build] 
Update S3 tests to reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#discussion_r234231999
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_s3.sh
 ##
 @@ -68,8 +68,8 @@ function s3_setup {
   trap s3_cleanup EXIT
 
   cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/
-  echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> 
"$FLINK_DIR/conf/flink-conf.yaml"
 
 Review comment:
   let's either change these to use the variables defined at the top 
(`AWS_REGION`, `AWS_ACCESS_KEY`, `AWS_SECRET_KEY`), or remove said variables 
since they are unused.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.

2018-11-16 Thread GitBox
zentol commented on a change in pull request #7098: [FLINK-10869] [build] 
Update S3 tests to reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#discussion_r234230495
 
 

 ##
 File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java
 ##
 @@ -87,7 +79,8 @@ public static void cleanUp() throws IOException {
 
@Test(expected = UnsupportedOperationException.class)
public void requestingRecoverableWriterShouldThroughException() throws 
Exception {
-   FlinkS3FileSystem fileSystem = (FlinkS3FileSystem) 
FileSystem.get(basePath.toUri());
+   URI s3Uri = URI.create(S3TestCredentials.getTestBucketUri());
 
 Review comment:
   This is not using `TEST_DATA_DIR` anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10869) Update S3 testing settings

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689528#comment-16689528
 ] 

ASF GitHub Bot commented on FLINK-10869:


StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] 
Update S3 tests to reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#discussion_r234236294
 
 

 ##
 File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java
 ##
 @@ -87,7 +79,8 @@ public static void cleanUp() throws IOException {
 
@Test(expected = UnsupportedOperationException.class)
public void requestingRecoverableWriterShouldThroughException() throws 
Exception {
-   FlinkS3FileSystem fileSystem = (FlinkS3FileSystem) 
FileSystem.get(basePath.toUri());
+   URI s3Uri = URI.create(S3TestCredentials.getTestBucketUri());
 
 Review comment:
   It never used the `TEST_DATA_DIR` - the `getFileSystem()` only uses scheme 
and authority, path does not matter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update S3 testing settings
> --
>
> Key: FLINK-10869
> URL: https://issues.apache.org/jira/browse/FLINK-10869
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently S3 tests go against a bucket hosted by 'data Artisans'.
> As part of reworking the AWS permission setup, we need to adapt the 
> credentials and buckets for these tests.
> Future tests should refer to the following environment variables for S3 tests:
> * `IT_CASE_S3_BUCKET`
> * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA`
> * `IT_CASE_S3_SECRET_KEY`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10906) docker-entrypoint.sh logs credentails during startup

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689574#comment-16689574
 ] 

ASF GitHub Bot commented on FLINK-10906:


tillrohrmann opened a new pull request #7125: [FLINK-10906][docker] Don't print 
configuration in docker-entrypoint.sh
URL: https://github.com/apache/flink/pull/7125
 
 
   ## What is the purpose of the change
   
   In order to not leak secrets we should not print the configuration in 
docker-entrypoint.sh.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> docker-entrypoint.sh logs credentails during startup
> 
>
> Key: FLINK-10906
> URL: https://issues.apache.org/jira/browse/FLINK-10906
> Project: Flink
>  Issue Type: Improvement
>  Components: Docker
>Reporter: Konstantin Knauf
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
>
>  {{docker-entrypoint.sh}} prints the full flink-configuration file including 
> potentially sensitive configuration entries.
> To be consistent we should hide these as in {{GlobalConfiguration}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann opened a new pull request #7125: [FLINK-10906][docker] Don't print configuration in docker-entrypoint.sh

2018-11-16 Thread GitBox
tillrohrmann opened a new pull request #7125: [FLINK-10906][docker] Don't print 
configuration in docker-entrypoint.sh
URL: https://github.com/apache/flink/pull/7125
 
 
   ## What is the purpose of the change
   
   In order to not leak secrets we should not print the configuration in 
docker-entrypoint.sh.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10900) Mark Kafka 2.0 connector as beta feature

2018-11-16 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10900.
--
Resolution: Fixed

merged commit 1680132 into apache:master
commits 146f4340ed..d02af7eda3 into apache:release-1.7

> Mark Kafka 2.0 connector as beta feature
> 
>
> Key: FLINK-10900
> URL: https://issues.apache.org/jira/browse/FLINK-10900
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Given the test problems with the Kafka 2.0 connector we should mark this 
> connector as a beta feature until we have fully understood why so many tests 
> deadlock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory

2018-11-16 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-10843.
--
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed in master: ad7e81ae3642f7d1f695a6f2474c59a81bb83949
Fixed in 1.7.0: d2c2773972fa0b4c5a348d2f885b6c9956432c18

> Make Kafka version definition more flexible for new Kafka table factory
> ---
>
> Key: FLINK-10843
> URL: https://issues.apache.org/jira/browse/FLINK-10843
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, a user has to specify a specific version for a Kafka connector 
> like:
> {code}
> connector:
>   type: kafka
>   version: "0.11" # required: valid connector versions are "0.8", "0.9", 
> "0.10", and "0.11"
>   topic: ...  # required: topic name from which the table is read
> {code}
> However, the new Kafka connector aims to be universal, thus, at least for 1.x 
> and 2.x versions which we should support those as parameters as well. 
> Currently, {{2.0}} is the only accepted string for the factory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10872) Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11

2018-11-16 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-10872.
--
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed in master: acd041cca5d591176aafc7de5d62db45c13b7375
Fixed in 1.7.0: 1891949d2584e67a5edb878eac62268c447fdc1f

> Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
> 
>
> Key: FLINK-10872
> URL: https://issues.apache.org/jira/browse/FLINK-10872
> Project: Flink
>  Issue Type: Test
>  Components: E2E Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski closed pull request #7108: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination

2018-11-16 Thread GitBox
pnowojski closed pull request #7108: [FLINK-10455][Kafka Tx] Close 
transactional producers in case of failure and termination
URL: https://github.com/apache/flink/pull/7108
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index adecab16d8c..d332cd0ca6a 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -45,6 +45,7 @@
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception {
}
// make sure we propagate pending errors
checkErroneous();
+   pendingTransactions().forEach(transaction ->
+   IOUtils.closeQuietly(transaction.getValue().producer)
+   );
}
 
// --- Logic for handling checkpoint flushing 
-- //
@@ -713,8 +717,11 @@ protected void preCommit(KafkaTransactionState 
transaction) throws FlinkKafka011
@Override
protected void commit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
-   transaction.producer.commitTransaction();
-   recycleTransactionalProducer(transaction.producer);
+   try {
+   transaction.producer.commitTransaction();
+   } finally {
+   
recycleTransactionalProducer(transaction.producer);
+   }
}
}
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 8398aa8cdbf..f2725242b2c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -38,20 +38,24 @@
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.time.Clock;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -149,6 +153,12 @@ protected TXN currentTransaction() {
return currentTransactionHolder == null ? null : 
currentTransactionHolder.handle;
}
 
+   @Nonnull
+   protected Stream> pendingTransactions() {
+   return pendingCommitTransactions.entrySet().stream()
+   .map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().handle));
+   }
+
// -- methods that should be implemented in child class to support 
two phase commit algorithm --
 
/**
@@ -256,6 +266,7 @@ public final void notifyCheckpointComplete(long 
checkpointId) throws Exception {
 
Iterator>> 
pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint 
completed, but no transaction pending");
+   Throwable firstError = null;
 
while (pendingTransactionIterator.hasNext()) {
Map.Entry> entry = 
pendingTransactionIterator.next();
@@ -269,12 +280,23 @@ public final void 

[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689662#comment-16689662
 ] 

ASF GitHub Bot commented on FLINK-10455:


pnowojski closed pull request #7108: [FLINK-10455][Kafka Tx] Close 
transactional producers in case of failure and termination
URL: https://github.com/apache/flink/pull/7108
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index adecab16d8c..d332cd0ca6a 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -45,6 +45,7 @@
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception {
}
// make sure we propagate pending errors
checkErroneous();
+   pendingTransactions().forEach(transaction ->
+   IOUtils.closeQuietly(transaction.getValue().producer)
+   );
}
 
// --- Logic for handling checkpoint flushing 
-- //
@@ -713,8 +717,11 @@ protected void preCommit(KafkaTransactionState 
transaction) throws FlinkKafka011
@Override
protected void commit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
-   transaction.producer.commitTransaction();
-   recycleTransactionalProducer(transaction.producer);
+   try {
+   transaction.producer.commitTransaction();
+   } finally {
+   
recycleTransactionalProducer(transaction.producer);
+   }
}
}
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 8398aa8cdbf..f2725242b2c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -38,20 +38,24 @@
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.time.Clock;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -149,6 +153,12 @@ protected TXN currentTransaction() {
return currentTransactionHolder == null ? null : 
currentTransactionHolder.handle;
}
 
+   @Nonnull
+   protected Stream> pendingTransactions() {
+   return pendingCommitTransactions.entrySet().stream()
+   .map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().handle));
+   }
+
// -- methods that should be implemented in child class to support 
two phase commit algorithm --
 
/**
@@ -256,6 +266,7 @@ public final void notifyCheckpointComplete(long 
checkpointId) throws Exception {
 
Iterator>> 
pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint 
completed, but no transaction pending");
+   

[jira] [Closed] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-16 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10455.
--
   Resolution: Fixed
Fix Version/s: 1.6.3
   1.5.6

merged commit e493d83 into apache:release-1.5
merged commit 489be82 into apache:release-1.6

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10880) Failover strategies should not be applied to Batch Execution

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689661#comment-16689661
 ] 

ASF GitHub Bot commented on FLINK-10880:


tillrohrmann opened a new pull request #7129: [FLINK-10880] Exclude 
JobManagerOptions#EXECUTION_FAILOVER_STRATEGY from documentation
URL: https://github.com/apache/flink/pull/7129
 
 
   ## What is the purpose of the change
   
   This commit excludes the `JobManagerOptions#EXECUTION_FAILOVER_STRATEGY` 
from Flink's
   configuration documentation.
   
   ## Brief change log
   
   - Introduce `Documentation#ExcludeFromDocumentation`
   - Exclude `JobManagerOptions#EXECUTION_FAILOVER_STRATEGY` from documentation
   
   ## Verifying this change
   
   - Added `ConfigOptionsDocGeneratorTest#testConfigOptionExclusion`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Failover strategies should not be applied to Batch Execution
> 
>
> Key: FLINK-10880
> URL: https://issues.apache.org/jira/browse/FLINK-10880
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.6.2
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> When configuring a failover strategy other than "full", DataSet/Batch 
> execution is currently not correct.
> This is expected, the failover region strategy is an experimental WIP feature 
> for streaming that has not been extended to the DataSet API.
> We need to document this and prevent execution of DataSet features with other 
> failover strategies than "full".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689659#comment-16689659
 ] 

ASF GitHub Bot commented on FLINK-10455:


pnowojski closed pull request #7107: [FLINK-10455][Kafka Tx] Close 
transactional producers in case of failure and termination
URL: https://github.com/apache/flink/pull/7107
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 84973726113..fe383ad6801 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -45,6 +45,7 @@
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception {
}
// make sure we propagate pending errors
checkErroneous();
+   pendingTransactions().forEach(transaction ->
+   IOUtils.closeQuietly(transaction.getValue().producer)
+   );
}
 
// --- Logic for handling checkpoint flushing 
-- //
@@ -714,8 +718,11 @@ protected void preCommit(KafkaTransactionState 
transaction) throws FlinkKafka011
protected void commit(KafkaTransactionState transaction) {
switch (semantic) {
case EXACTLY_ONCE:
-   transaction.producer.commitTransaction();
-   
recycleTransactionalProducer(transaction.producer);
+   try {
+   
transaction.producer.commitTransaction();
+   } finally {
+   
recycleTransactionalProducer(transaction.producer);
+   }
break;
case AT_LEAST_ONCE:
case NONE:
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 2ffb6d5810e..62562623fc4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -38,20 +38,24 @@
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.time.Clock;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -149,6 +153,12 @@ protected TXN currentTransaction() {
return currentTransactionHolder == null ? null : 
currentTransactionHolder.handle;
}
 
+   @Nonnull
+   protected Stream> pendingTransactions() {
+   return pendingCommitTransactions.entrySet().stream()
+   .map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().handle));
+   }
+
// -- methods that should be implemented in child class to support 
two phase commit algorithm --
 
/**
@@ -256,6 +266,7 @@ public final void notifyCheckpointComplete(long 
checkpointId) throws Exception {
 
Iterator>> 
pendingTransactionIterator = 

[jira] [Reopened] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-16 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-10455:


> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #7107: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination

2018-11-16 Thread GitBox
pnowojski commented on issue #7107: [FLINK-10455][Kafka Tx] Close transactional 
producers in case of failure and termination
URL: https://github.com/apache/flink/pull/7107#issuecomment-439457778
 
 
   Thanks @azagrebin, merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski closed pull request #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1

2018-11-16 Thread GitBox
pnowojski closed pull request #7101: [FLINK-10891] Upgrade Kafka client version 
to 2.0.1
URL: https://github.com/apache/flink/pull/7101
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kafka/pom.xml 
b/flink-connectors/flink-connector-kafka/pom.xml
index 67ec39d8f27..63d43994687 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -36,7 +36,7 @@ under the License.
jar
 

-   2.0.0
+   2.0.1

 

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml 
b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index fcfe4bf69f0..1a6a80e9c8f 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -114,7 +114,7 @@ under the License.
as we neither access nor package the 
kafka dependencies -->
org.apache.kafka
kafka-clients
-   2.0.0
+   2.0.1



diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index 5dd68838ba7..8ace250a319 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -20,7 +20,7 @@
 set -Eeuo pipefail
 
 KAFKA_CONNECTOR_VERSION="2.0"
-KAFKA_VERSION="2.0.0"
+KAFKA_VERSION="2.0.1"
 CONFLUENT_VERSION="5.0.0"
 CONFLUENT_MAJOR_VERSION="5.0"
 KAFKA_SQL_VERSION="universal"
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
index 94e89a2b1e8..0941cf200f1 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
@@ -19,4 +19,4 @@
 
 set -Eeuo pipefail
 
-source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 
"kafka" "universal"
+source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.1 5.0.0 5.0 
"kafka" "universal"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
index 044d2237988..ff36cf1d778 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
@@ -20,6 +20,6 @@
 set -Eeuo pipefail
 
 source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka-common.sh 2.0.0 5.0.0 5.0
+source "$(dirname "$0")"/kafka-common.sh 2.0.1 5.0.0 5.0
 
 source "$(dirname "$0")"/test_streaming_kafka_common.sh 
$FLINK_DIR/examples/streaming/KafkaExample.jar


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10891) Upgrade Kafka client version to 2.0.1

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689681#comment-16689681
 ] 

ASF GitHub Bot commented on FLINK-10891:


pnowojski closed pull request #7101: [FLINK-10891] Upgrade Kafka client version 
to 2.0.1
URL: https://github.com/apache/flink/pull/7101
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kafka/pom.xml 
b/flink-connectors/flink-connector-kafka/pom.xml
index 67ec39d8f27..63d43994687 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -36,7 +36,7 @@ under the License.
jar
 

-   2.0.0
+   2.0.1

 

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml 
b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index fcfe4bf69f0..1a6a80e9c8f 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -114,7 +114,7 @@ under the License.
as we neither access nor package the 
kafka dependencies -->
org.apache.kafka
kafka-clients
-   2.0.0
+   2.0.1



diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index 5dd68838ba7..8ace250a319 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -20,7 +20,7 @@
 set -Eeuo pipefail
 
 KAFKA_CONNECTOR_VERSION="2.0"
-KAFKA_VERSION="2.0.0"
+KAFKA_VERSION="2.0.1"
 CONFLUENT_VERSION="5.0.0"
 CONFLUENT_MAJOR_VERSION="5.0"
 KAFKA_SQL_VERSION="universal"
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
index 94e89a2b1e8..0941cf200f1 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
@@ -19,4 +19,4 @@
 
 set -Eeuo pipefail
 
-source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 
"kafka" "universal"
+source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.1 5.0.0 5.0 
"kafka" "universal"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
index 044d2237988..ff36cf1d778 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
@@ -20,6 +20,6 @@
 set -Eeuo pipefail
 
 source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka-common.sh 2.0.0 5.0.0 5.0
+source "$(dirname "$0")"/kafka-common.sh 2.0.1 5.0.0 5.0
 
 source "$(dirname "$0")"/test_streaming_kafka_common.sh 
$FLINK_DIR/examples/streaming/KafkaExample.jar


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Upgrade Kafka client version to 2.0.1
> -
>
> Key: FLINK-10891
> URL: https://issues.apache.org/jira/browse/FLINK-10891
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Since the modern kafka connector only keeps track of the latest version of 
> the kafka client. With the release of Kafka 2.0.1, we should upgrade the 
> version of the kafka client maven dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1

2018-11-16 Thread GitBox
pnowojski commented on issue #7101: [FLINK-10891] Upgrade Kafka client version 
to 2.0.1
URL: https://github.com/apache/flink/pull/7101#issuecomment-439462934
 
 
   I'm merging it, thanks again for the contribution @yanghua.
   
   Regarding the issues, I'm always looking on apache JIRA for either a failing 
test name or an exception message, since one of those is always mentioned 
somewhere.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10904) Expose Classloader before Pipeline execution

2018-11-16 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689722#comment-16689722
 ] 

Stephan Ewen commented on FLINK-10904:
--

Anywhere in the Flink processes (TaskManager) where any application code is 
involved, the classloader should be available as the thread context 
classloader. It should be available there before any execution starts, even 
when deserializing the code from the RPC payload, it should be already 
available.

> Expose Classloader before Pipeline execution
> 
>
> Key: FLINK-10904
> URL: https://issues.apache.org/jira/browse/FLINK-10904
> Project: Flink
>  Issue Type: Wish
>Reporter: Luka Jurukovski
>Priority: Minor
>
> Not sure if this is something that I just have to deal with. However it would 
> be nice if the classloader can be accessed before the pipeline starts 
> executing. The case for this is that I am loading classes that contain Flink 
> operators. I am running into classdef not found issues because the 
> classloader used by Flink is different then the application program that is 
> being run. Currently what I have been doing as a work around is adding the 
> libs that cause these issues in the /lib directory of the Flink cluster and 
> marking it as provided in the application jar that is uploaded to the Flink 
> cluster. The issues with this are two fold, first it makes deployment more 
> complex, as well as there are cases where Classloading causes exceptions to 
> arise in unusual circumstances. Ie RabbitMQ connector caused issues only when 
> it was auto-recovering the connection, but not during normal ingest. I can 
> elaborate further if needed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #3880: [FLINK-6457] [table] Clean up ScalarFunction and TableFunction interface

2018-11-16 Thread GitBox
twalthr commented on issue #3880: [FLINK-6457] [table] Clean up ScalarFunction 
and TableFunction interface
URL: https://github.com/apache/flink/pull/3880#issuecomment-439417094
 
 
   I also agree with @fhueske. I will close this PR for now. The affected 
classes might be ported to Java soon anyway due to FLINK-10689.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr closed pull request #3880: [FLINK-6457] [table] Clean up ScalarFunction and TableFunction interface

2018-11-16 Thread GitBox
twalthr closed pull request #3880: [FLINK-6457] [table] Clean up ScalarFunction 
and TableFunction interface
URL: https://github.com/apache/flink/pull/3880
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 9f50f0c806a..1aab7ae0993 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -68,6 +68,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable.HashMap
 import _root_.scala.annotation.varargs
+import _root_.scala.util.{Try, Success, Failure}
 
 /**
   * The abstract base class for batch and stream TableEnvironments.
@@ -340,10 +341,11 @@ abstract class TableEnvironment(val config: TableConfig) {
 // check if class could be instantiated
 checkForInstantiation(function.getClass)
 
-val typeInfo: TypeInformation[_] = if (function.getResultType != null) {
-  function.getResultType
-} else {
-  implicitly[TypeInformation[T]]
+val typeInfo: TypeInformation[_] = Try {
+  function.getClass.getDeclaredMethod("getResultType")
+} match {
+  case Success(m) => m.invoke(function).asInstanceOf[TypeInformation[_]]
+  case Failure(_) => implicitly[TypeInformation[T]]
 }
 
 // register in Table API
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala
new file mode 100644
index 000..d781e76071d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.table.expressions.{Expression, ScalarFunctionCall}
+import org.apache.flink.table.functions.ScalarFunction
+
+/**
+  * Holds methods to convert a [[ScalarFunction]] call
+  * in the Scala Table API into a [[ScalarFunctionCall]].
+  *
+  * @param sf The ScalarFunction to convert.
+  */
+class ScalarFunctionConversions(sf: ScalarFunction) {
+  /**
+* Creates a [[ScalarFunctionCall]] in Scala Table API.
+*
+* @param params actual parameters of function
+* @return [[Expression]] in form of a [[ScalarFunctionCall]]
+*/
+  final def apply(params: Expression*): Expression = {
+ScalarFunctionCall(sf, params)
+  }
+}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
index 692876fe2da..292e241077a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
@@ -24,6 +24,8 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
 
+import scala.util.{Failure, Success, Try}
+
 /**
   * Holds methods to convert a [[TableFunction]] call in the Scala Table API 
into a [[Table]].
   *
@@ -39,7 +41,12 @@ class TableFunctionConversions[T](tf: TableFunction[T]) {
 */
   final def apply(args: Expression*)(implicit typeInfo: TypeInformation[T]): 
Table = {
 
-val resultType = if (tf.getResultType == null) typeInfo else 
tf.getResultType
+val resultType: TypeInformation[_] = Try {
+  

[jira] [Commented] (FLINK-10889) Semantic inconsistency between DataSet#print and DataStream#print

2018-11-16 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689512#comment-16689512
 ] 

Fabian Hueske commented on FLINK-10889:
---

Hmm, OK. I see. 
This (or a similar) behavior would in fact be needed for Zeppelin. 
Unfortunately, DataStream.print() is public API that we should avoid to break. 
I know it is only declared as {{@PublicEvolving}} but still.
Maybe we can introduce another method for this.

We have been thinking about a REST service for SQL queries that also supports 
streaming results back. Although more in the distance, this might be a better 
approach in the long term.

> Semantic inconsistency between DataSet#print and DataStream#print
> -
>
> Key: FLINK-10889
> URL: https://issues.apache.org/jira/browse/FLINK-10889
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: Jeff Zhang
>Assignee: vinoyang
>Priority: Major
>
> DataSet#print will print the result on client side, while DataStream#print 
> will print the result on TM. This inconsistency will confuse users. IMHO, we 
> should make the behavior consistency between DataSet and DataStream, I prefer 
> to print the result on client side.  Regarding DataStream#print, we can use 
> DataStreamUtils#collect to print it on client side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10869) Update S3 testing settings

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689525#comment-16689525
 ] 

ASF GitHub Bot commented on FLINK-10869:


StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] 
Update S3 tests to reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#discussion_r234235828
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_s3.sh
 ##
 @@ -68,8 +68,8 @@ function s3_setup {
   trap s3_cleanup EXIT
 
   cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/
-  echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> 
"$FLINK_DIR/conf/flink-conf.yaml"
 
 Review comment:
   I don't know all the implication of changing/removing these variables in 
bash scripts ;-)
   It is also orthogonal to the change here, I would like to leave this for a 
dedicated cleanup setp


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update S3 testing settings
> --
>
> Key: FLINK-10869
> URL: https://issues.apache.org/jira/browse/FLINK-10869
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently S3 tests go against a bucket hosted by 'data Artisans'.
> As part of reworking the AWS permission setup, we need to adapt the 
> credentials and buckets for these tests.
> Future tests should refer to the following environment variables for S3 tests:
> * `IT_CASE_S3_BUCKET`
> * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA`
> * `IT_CASE_S3_SECRET_KEY`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.

2018-11-16 Thread GitBox
StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] 
Update S3 tests to reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#discussion_r234235828
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_s3.sh
 ##
 @@ -68,8 +68,8 @@ function s3_setup {
   trap s3_cleanup EXIT
 
   cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/
-  echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> 
"$FLINK_DIR/conf/flink-conf.yaml"
 
 Review comment:
   I don't know all the implication of changing/removing these variables in 
bash scripts ;-)
   It is also orthogonal to the change here, I would like to leave this for a 
dedicated cleanup setp


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10903) Shade Internal Akka Dependencies

2018-11-16 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689718#comment-16689718
 ] 

Stephan Ewen commented on FLINK-10903:
--

Are you referring specifically to the akka dependency, or to the Scala 
dependency as a whole?

> Shade Internal Akka Dependencies
> 
>
> Key: FLINK-10903
> URL: https://issues.apache.org/jira/browse/FLINK-10903
> Project: Flink
>  Issue Type: Wish
>Reporter: Luka Jurukovski
>Priority: Minor
>
> Akka is not a publicly exposed API but is something that forces developers 
> (particularly in Scala) to use an older version. It would be nice if this was 
> shaded so that developers are free to use the version of their choosing 
> without needing to worry about binary backwards compatibility, or in the case 
> that a user is forced to use parent first classloading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10880) Failover strategies should not be applied to Batch Execution

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689673#comment-16689673
 ] 

ASF GitHub Bot commented on FLINK-10880:


tillrohrmann commented on a change in pull request #7129: [FLINK-10880] Exclude 
JobManagerOptions#EXECUTION_FAILOVER_STRATEGY from documentation
URL: https://github.com/apache/flink/pull/7129#discussion_r234281426
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 ##
 @@ -106,6 +106,7 @@
/**
 * This option specifies the failover strategy, i.e. how the job 
computation recovers from task failures.
 */
+   @Documentation.ExcludeFromDocumentation
 
 Review comment:
   Good idea. Will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Failover strategies should not be applied to Batch Execution
> 
>
> Key: FLINK-10880
> URL: https://issues.apache.org/jira/browse/FLINK-10880
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.6.2
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> When configuring a failover strategy other than "full", DataSet/Batch 
> execution is currently not correct.
> This is expected, the failover region strategy is an experimental WIP feature 
> for streaming that has not been extended to the DataSet API.
> We need to document this and prevent execution of DataSet features with other 
> failover strategies than "full".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6036) Let catalog support partition

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689489#comment-16689489
 ] 

ASF GitHub Bot commented on FLINK-6036:
---

twalthr closed pull request #3569: [FLINK-6036] [table] Let catalog support 
partition
URL: https://github.com/apache/flink/pull/3569
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 760cf7588f4..1b07fd8998d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.api
 
+import com.google.common.base.Joiner
 import org.apache.flink.table.catalog.TableSourceConverter
+import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec
 
 /**
   * Exception for all errors occurring during expression parsing.
@@ -74,6 +76,50 @@ object ValidationException {
   */
 case class UnresolvedException(msg: String) extends RuntimeException(msg)
 
+/**
+  * Exception for an operation on a nonexistent partition
+  *
+  * @param dbdatabase name
+  * @param table table name
+  * @param partitionSpec partition spec
+  * @param cause the cause
+  */
+case class PartitionNotExistException(
+db: String,
+table: String,
+partitionSpec: PartitionSpec,
+cause: Throwable)
+extends RuntimeException(
+  s"Partition 
[${Joiner.on(",").withKeyValueSeparator("=").join(partitionSpec)}] " +
+  s"does not exist in table $db.$table!", cause) {
+
+  def this(db: String, table: String, partitionSpec: PartitionSpec) =
+this(db, table, partitionSpec, null)
+
+}
+
+/**
+  * Exception for adding an already existent partition
+  *
+  * @param dbdatabase name
+  * @param table table name
+  * @param partitionSpec partition spec
+  * @param cause the cause
+  */
+case class PartitionAlreadyExistException(
+db: String,
+table: String,
+partitionSpec: PartitionSpec,
+cause: Throwable)
+extends RuntimeException(
+  s"Partition 
[${Joiner.on(",").withKeyValueSeparator("=").join(partitionSpec)}] " +
+  s"already exists in table $db.$table!", cause) {
+
+  def this(db: String, table: String, partitionSpec: PartitionSpec) =
+this(db, table, partitionSpec, null)
+
+}
+
 /**
   * Exception for an operation on a nonexistent table
   *
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
index fcefa45fcc5..78798ce0b72 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
@@ -19,12 +19,81 @@
 package org.apache.flink.table.catalog
 
 import org.apache.flink.table.api._
+import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec
 
 /**
   * The CrudExternalCatalog provides methods to create, drop, and alter 
databases or tables.
   */
 trait CrudExternalCatalog extends ExternalCatalog {
 
+  /**
+* Adds a partition to the catalog.
+*
+* @param dbName The name of the table's database.
+* @param tableName  The name of the table.
+* @param part   Description of the partition to add.
+* @param ignoreIfExists Flag to specify behavior if a partition with the 
given spec
+*   already exists:
+*   if set to false, it throws a 
PartitionAlreadyExistException,
+*   if set to true, nothing happens.
+* @throws DatabaseNotExistException  thrown if the database does not 
exist in the catalog.
+* @throws TableNotExistException thrown if the table does not 
exist in the catalog.
+* @throws PartitionAlreadyExistException thrown if the partition already 
exists and
+*ignoreIfExists is false
+*/
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  @throws[PartitionAlreadyExistException]
+  def createPartition(
+  dbName: String,
+  tableName: String,
+  part: ExternalCatalogTablePartition,
+  ignoreIfExists: Boolean): Unit
+
+  /**
+* Deletes partition from a database of the catalog.
+*
+* @param dbNameThe name of the 

[GitHub] StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.

2018-11-16 Thread GitBox
StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] 
Update S3 tests to reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#discussion_r234236294
 
 

 ##
 File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java
 ##
 @@ -87,7 +79,8 @@ public static void cleanUp() throws IOException {
 
@Test(expected = UnsupportedOperationException.class)
public void requestingRecoverableWriterShouldThroughException() throws 
Exception {
-   FlinkS3FileSystem fileSystem = (FlinkS3FileSystem) 
FileSystem.get(basePath.toUri());
+   URI s3Uri = URI.create(S3TestCredentials.getTestBucketUri());
 
 Review comment:
   It never used the `TEST_DATA_DIR` - the `getFileSystem()` only uses scheme 
and authority, path does not matter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta

2018-11-16 Thread GitBox
pnowojski commented on a change in pull request #7109: 
[FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
URL: https://github.com/apache/flink/pull/7109#discussion_r234248328
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not 
part of the binary distribu
 
 ## Kafka 1.0.0+ Connector
 
-Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version. Rather, it tracks the latest version of Kafka at 
the time of the Flink release.
+Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version.
+Rather, it tracks the latest version of Kafka at the time of the Flink release.
 
-If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you 
should use the connector corresponding to the broker version.
+If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector.
+If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use 
the connector corresponding to the broker version.
 
 ### Compatibility
 
-The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker. The 
modern Kafka connector is compatible with broker versions 0.11.0 or later, 
depending on the features used. For details on Kafka compatibility, please 
refer to the [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
+The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker.
 
 Review comment:
   I was just brakinglines here, but I can fix this while I'm at it. Thanks for 
pointing out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10900) Mark Kafka 2.0 connector as beta feature

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689566#comment-16689566
 ] 

ASF GitHub Bot commented on FLINK-10900:


pnowojski commented on a change in pull request #7109: 
[FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
URL: https://github.com/apache/flink/pull/7109#discussion_r234247629
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -73,21 +73,33 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
 This connector supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message;>Kafka
 messages with timestamps both for producing and consuming.
 
 
-flink-connector-kafka-0.11_2.11
+flink-connector-kafka-0.11{{ site.scala_version_suffix }}
 1.4.0
 FlinkKafkaConsumer011
 FlinkKafkaProducer011
 0.11.x
 Since 0.11.x Kafka does not support scala 2.10. This connector 
supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging;>Kafka
 transactional messaging to provide exactly once semantic for the 
producer.
 
 
-flink-connector-kafka_2.11
+flink-connector-kafka{{ site.scala_version_suffix }}
 1.7.0
 FlinkKafkaConsumer
 FlinkKafkaProducer
 >= 1.0.0
-This Kafka connector attempts to track the latest version of the 
Kafka client. The version of the client it uses may change between Flink 
releases. Modern Kafka clients are backwards compatible with broker versions 
0.10.0 or later. However for Kafka 0.11.x and 0.10.x versions, we recommend 
using dedicated flink-connector-kafka-0.11 and link-connector-kafka-0.10 
respectively.
-
+
+This universal Kafka connector attempts to track the latest version of 
the Kafka client.
+The version of the client it uses may change between Flink releases.
+Modern Kafka clients are backwards compatible with broker versions 
0.10.0 or later.
+However for Kafka 0.11.x and 0.10.x versions, we recommend using 
dedicated
+flink-connector-kafka-0.11 and link-connector-kafka-0.10 respectively.
+
+  Attention: as of Flink 1.7 the universal Kafka 
connector is considered to be
+  in a BETA status and might not be as stable as the 
0.11 connector.
+  In case of problems with the universal connector, you can try to use 
flink-connector-kafka-0.11{{ site.scala_version_suffix }}
+  which should be compatible with all of the Kafka versions starting 
from 0.11.
 
 Review comment:
   This is a good point, but as far as I know this is related only if someone 
would want to upgrade `kafka.version` of the `flink-connector-kafka`. There 
were couple of users that were successfully using `flink-connector-kafka-0.11` 
against Kafka 1.0 and it should work for the same reasons why `universal` 
connector should work with any Kafka version `0.11+`.
   
   I would like to merge this before the next release candidate, but if I'm 
wrong with this regard, please let me know here - we can always fix the 
documentation after the release :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mark Kafka 2.0 connector as beta feature
> 
>
> Key: FLINK-10900
> URL: https://issues.apache.org/jira/browse/FLINK-10900
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Given the test problems with the Kafka 2.0 connector we should mark this 
> connector as a beta feature until we have fully understood why so many tests 
> deadlock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta

2018-11-16 Thread GitBox
pnowojski commented on a change in pull request #7109: 
[FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
URL: https://github.com/apache/flink/pull/7109#discussion_r234248328
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not 
part of the binary distribu
 
 ## Kafka 1.0.0+ Connector
 
-Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version. Rather, it tracks the latest version of Kafka at 
the time of the Flink release.
+Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version.
+Rather, it tracks the latest version of Kafka at the time of the Flink release.
 
-If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you 
should use the connector corresponding to the broker version.
+If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector.
+If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use 
the connector corresponding to the broker version.
 
 ### Compatibility
 
-The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker. The 
modern Kafka connector is compatible with broker versions 0.11.0 or later, 
depending on the features used. For details on Kafka compatibility, please 
refer to the [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
+The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker.
 
 Review comment:
   I was just braking new lines here, but I can fix this while I'm at it. 
Thanks for pointing out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta

2018-11-16 Thread GitBox
pnowojski commented on a change in pull request #7109: 
[FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
URL: https://github.com/apache/flink/pull/7109#discussion_r234248328
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not 
part of the binary distribu
 
 ## Kafka 1.0.0+ Connector
 
-Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version. Rather, it tracks the latest version of Kafka at 
the time of the Flink release.
+Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version.
+Rather, it tracks the latest version of Kafka at the time of the Flink release.
 
-If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you 
should use the connector corresponding to the broker version.
+If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector.
+If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use 
the connector corresponding to the broker version.
 
 ### Compatibility
 
-The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker. The 
modern Kafka connector is compatible with broker versions 0.11.0 or later, 
depending on the features used. For details on Kafka compatibility, please 
refer to the [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
+The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker.
 
 Review comment:
   I was just braking lines here, but I can fix this while I'm at it. Thanks 
for pointing out - I have fixed all `modern` references.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski closed pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta

2018-11-16 Thread GitBox
pnowojski closed pull request #7109: [FLINK-10900][kafka][docs] Mark universal 
Kafka connector as beta
URL: https://github.com/apache/flink/pull/7109
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index f81080a4b53..0630c6ec7d6 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -73,7 +73,7 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
 This connector supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message;>Kafka
 messages with timestamps both for producing and consuming.
 
 
-flink-connector-kafka-0.11_2.11
+flink-connector-kafka-0.11{{ site.scala_version_suffix }}
 1.4.0
 FlinkKafkaConsumer011
 FlinkKafkaProducer011
@@ -81,13 +81,25 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
 Since 0.11.x Kafka does not support scala 2.10. This connector 
supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging;>Kafka
 transactional messaging to provide exactly once semantic for the 
producer.
 
 
-flink-connector-kafka_2.11
+flink-connector-kafka{{ site.scala_version_suffix }}
 1.7.0
 FlinkKafkaConsumer
 FlinkKafkaProducer
 >= 1.0.0
-This Kafka connector attempts to track the latest version of the 
Kafka client. The version of the client it uses may change between Flink 
releases. Modern Kafka clients are backwards compatible with broker versions 
0.10.0 or later. However for Kafka 0.11.x and 0.10.x versions, we recommend 
using dedicated flink-connector-kafka-0.11 and link-connector-kafka-0.10 
respectively.
-
+
+This universal Kafka connector attempts to track the latest version of 
the Kafka client.
+The version of the client it uses may change between Flink releases.
+Modern Kafka clients are backwards compatible with broker versions 
0.10.0 or later.
+However for Kafka 0.11.x and 0.10.x versions, we recommend using 
dedicated
+flink-connector-kafka-0.11{{ site.scala_version_suffix }} and 
link-connector-kafka-0.10{{ site.scala_version_suffix }} respectively.
+
+  Attention: as of Flink 1.7 the universal Kafka 
connector is considered to be
+  in a BETA status and might not be as stable as the 
0.11 connector.
+  In case of problems with the universal connector, you can try to use 
flink-connector-kafka-0.11{{ site.scala_version_suffix }}
+  which should be compatible with all of the Kafka versions starting 
from 0.11.
+
+
+
   
 
 
@@ -101,7 +113,8 @@ Then, import the connector in your maven project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently not part of the binary 
distribution.
+See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
 
 ## Installing Apache Kafka
 
@@ -110,17 +123,21 @@ Note that the streaming connectors are currently not part 
of the binary distribu
 
 ## Kafka 1.0.0+ Connector
 
-Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version. Rather, it tracks the latest version of Kafka at 
the time of the Flink release.
+Starting with Flink 1.7, there is a new universal Kafka connector that does 
not track a specific Kafka major version.
+Rather, it tracks the latest version of Kafka at the time of the Flink release.
 
-If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you 
should use the connector corresponding to the broker version.
+If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector.
+If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use 
the connector corresponding to the broker version.
 
 ### Compatibility
 
-The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker. The 
modern Kafka connector is compatible with broker versions 0.11.0 or later, 
depending on the features used. For details on Kafka compatibility, please 
refer to the [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
+The universal Kafka connector is compatible with older and newer Kafka brokers 
through 

[jira] [Commented] (FLINK-10900) Mark Kafka 2.0 connector as beta feature

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689579#comment-16689579
 ] 

ASF GitHub Bot commented on FLINK-10900:


pnowojski commented on a change in pull request #7109: 
[FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
URL: https://github.com/apache/flink/pull/7109#discussion_r234248328
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not 
part of the binary distribu
 
 ## Kafka 1.0.0+ Connector
 
-Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version. Rather, it tracks the latest version of Kafka at 
the time of the Flink release.
+Starting with Flink 1.7, there is a new Kafka connector that does not track a 
specific Kafka major version.
+Rather, it tracks the latest version of Kafka at the time of the Flink release.
 
-If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you 
should use the connector corresponding to the broker version.
+If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector.
+If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use 
the connector corresponding to the broker version.
 
 ### Compatibility
 
-The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker. The 
modern Kafka connector is compatible with broker versions 0.11.0 or later, 
depending on the features used. For details on Kafka compatibility, please 
refer to the [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
+The modern Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker.
 
 Review comment:
   I was just braking lines here, but I can fix this while I'm at it. Thanks 
for pointing out - I have fixed all `modern` references.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mark Kafka 2.0 connector as beta feature
> 
>
> Key: FLINK-10900
> URL: https://issues.apache.org/jira/browse/FLINK-10900
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Given the test problems with the Kafka 2.0 connector we should mark this 
> connector as a beta feature until we have fully understood why so many tests 
> deadlock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10906) docker-entrypoint.sh logs credentails during startup

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10906:
---
Labels: pull-request-available  (was: )

> docker-entrypoint.sh logs credentails during startup
> 
>
> Key: FLINK-10906
> URL: https://issues.apache.org/jira/browse/FLINK-10906
> Project: Flink
>  Issue Type: Improvement
>  Components: Docker
>Reporter: Konstantin Knauf
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
>
>  {{docker-entrypoint.sh}} prints the full flink-configuration file including 
> potentially sensitive configuration entries.
> To be consistent we should hide these as in {{GlobalConfiguration}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann opened a new pull request #7127: [BP-1.6][FLINK-10906][docker] Don't print configuration in docker-entrypoint.sh

2018-11-16 Thread GitBox
tillrohrmann opened a new pull request #7127: [BP-1.6][FLINK-10906][docker] 
Don't print configuration in docker-entrypoint.sh
URL: https://github.com/apache/flink/pull/7127
 
 
   Backport of #7125 to `release-1.6`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >