[GitHub] tillrohrmann commented on issue #7111: [FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test
tillrohrmann commented on issue #7111: [FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test URL: https://github.com/apache/flink/pull/7111#issuecomment-439315975 Thanks for the review @zentol. Merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-10877) Remove duplicate dependency entries in kafka connector pom
[ https://issues.apache.org/jira/browse/FLINK-10877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10877. --- Resolution: Fixed Fixed via master: https://github.com/apache/flink/commit/00274233b40230559f942ae13fc1d19e54f79fbe 1.7.0: https://github.com/apache/flink/commit/4c52cdbda1fb13754bad4c5ad4a7eb81ba3884f5 > Remove duplicate dependency entries in kafka connector pom > -- > > Key: FLINK-10877 > URL: https://issues.apache.org/jira/browse/FLINK-10877 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Trivial > Labels: pull-request-available > > The {{flink-connectorkafka}} {{pom.xml}} contains multiple dependency entries > for {{flink-connector-kafka-base}} and moreover excludes dependencies from a > test-jar. This is not necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10705) Rework Flink Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-10705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689145#comment-16689145 ] Fabian Wollert commented on FLINK-10705: build and deployment to our dev cluster worked fine. since i'm not familiar with angular and/or typescript i cant review the actual code unfortunately. and since there is not much content/functionality yet to check, i cant be much help here unfortunately so far :( > Rework Flink Web Dashboard > -- > > Key: FLINK-10705 > URL: https://issues.apache.org/jira/browse/FLINK-10705 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.6.2 >Reporter: Fabian Wollert >Assignee: Fabian Wollert >Priority: Major > Fix For: 1.8.0 > > Attachments: image-2018-10-29-09-17-24-115.png > > > The Flink Dashboard is very simple currently and should get updated. This is > the umbrella ticket for other tickets regarding this. Please check the > sub-tickets for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #7094: [FLINK-10877] Cleanup flink-connector-kafka pom file
tillrohrmann closed pull request #7094: [FLINK-10877] Cleanup flink-connector-kafka pom file URL: https://github.com/apache/flink/pull/7094 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml index b90cea2093a..67ec39d8f27 100644 --- a/flink-connectors/flink-connector-kafka/pom.xml +++ b/flink-connectors/flink-connector-kafka/pom.xml @@ -140,28 +140,6 @@ under the License. org.apache.flink flink-connector-kafka-base_${scala.binary.version} ${project.version} - - - - org.apache.kafka - kafka_${scala.binary.version} - - - test-jar - test - - - - org.apache.flink - flink-connector-kafka-base_${scala.binary.version} - ${project.version} - - - - org.apache.kafka - kafka_${scala.binary.version} - - test-jar test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10877) Remove duplicate dependency entries in kafka connector pom
[ https://issues.apache.org/jira/browse/FLINK-10877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689146#comment-16689146 ] ASF GitHub Bot commented on FLINK-10877: tillrohrmann closed pull request #7094: [FLINK-10877] Cleanup flink-connector-kafka pom file URL: https://github.com/apache/flink/pull/7094 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml index b90cea2093a..67ec39d8f27 100644 --- a/flink-connectors/flink-connector-kafka/pom.xml +++ b/flink-connectors/flink-connector-kafka/pom.xml @@ -140,28 +140,6 @@ under the License. org.apache.flink flink-connector-kafka-base_${scala.binary.version} ${project.version} - - - - org.apache.kafka - kafka_${scala.binary.version} - - - test-jar - test - - - - org.apache.flink - flink-connector-kafka-base_${scala.binary.version} - ${project.version} - - - - org.apache.kafka - kafka_${scala.binary.version} - - test-jar test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove duplicate dependency entries in kafka connector pom > -- > > Key: FLINK-10877 > URL: https://issues.apache.org/jira/browse/FLINK-10877 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Trivial > Labels: pull-request-available > > The {{flink-connectorkafka}} {{pom.xml}} contains multiple dependency entries > for {{flink-connector-kafka-base}} and moreover excludes dependencies from a > test-jar. This is not necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #7113: [BP-1.6][FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test
tillrohrmann closed pull request #7113: [BP-1.6][FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test URL: https://github.com/apache/flink/pull/7113 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 9719e183cf0..5efdf0a974a 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -659,3 +659,10 @@ function expect_in_taskmanager_logs { fi done } + +function find_latest_completed_checkpoint { +local checkpoint_root_directory=$1 +# a completed checkpoint must contain the _metadata file +local checkpoint_meta_file=$(ls -d ${checkpoint_root_directory}/chk-[1-9]*/_metadata | sort -Vr | head -n1) +echo "$(dirname "${checkpoint_meta_file}")" +} diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index c1477574d5d..35fe30b6b25 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -112,7 +112,7 @@ else fi # take the latest checkpoint -CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]* | sort -Vr | head -n1) +CHECKPOINT_PATH=$(find_latest_completed_checkpoint ${CHECKPOINT_DIR}/${DATASTREAM_JOB}) if [ -z $CHECKPOINT_PATH ]; then echo "Expected an externalized checkpoint to be present, but none exists." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann closed pull request #7111: [FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test
tillrohrmann closed pull request #7111: [FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test URL: https://github.com/apache/flink/pull/7111 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index ea267538639..4e6254864c1 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -656,3 +656,10 @@ function wait_for_restart_to_complete { fi done } + +function find_latest_completed_checkpoint { +local checkpoint_root_directory=$1 +# a completed checkpoint must contain the _metadata file +local checkpoint_meta_file=$(ls -d ${checkpoint_root_directory}/chk-[1-9]*/_metadata | sort -Vr | head -n1) +echo "$(dirname "${checkpoint_meta_file}")" +} diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index c1477574d5d..35fe30b6b25 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -112,7 +112,7 @@ else fi # take the latest checkpoint -CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]* | sort -Vr | head -n1) +CHECKPOINT_PATH=$(find_latest_completed_checkpoint ${CHECKPOINT_DIR}/${DATASTREAM_JOB}) if [ -z $CHECKPOINT_PATH ]; then echo "Expected an externalized checkpoint to be present, but none exists." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann closed pull request #7112: [BP-1.7][FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test
tillrohrmann closed pull request #7112: [BP-1.7][FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test URL: https://github.com/apache/flink/pull/7112 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index ea267538639..4e6254864c1 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -656,3 +656,10 @@ function wait_for_restart_to_complete { fi done } + +function find_latest_completed_checkpoint { +local checkpoint_root_directory=$1 +# a completed checkpoint must contain the _metadata file +local checkpoint_meta_file=$(ls -d ${checkpoint_root_directory}/chk-[1-9]*/_metadata | sort -Vr | head -n1) +echo "$(dirname "${checkpoint_meta_file}")" +} diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index c1477574d5d..35fe30b6b25 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -112,7 +112,7 @@ else fi # take the latest checkpoint -CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]* | sort -Vr | head -n1) +CHECKPOINT_PATH=$(find_latest_completed_checkpoint ${CHECKPOINT_DIR}/${DATASTREAM_JOB}) if [ -z $CHECKPOINT_PATH ]; then echo "Expected an externalized checkpoint to be present, but none exists." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10856) Harden resume from externalized checkpoint E2E test
[ https://issues.apache.org/jira/browse/FLINK-10856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689148#comment-16689148 ] ASF GitHub Bot commented on FLINK-10856: tillrohrmann commented on issue #7111: [FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test URL: https://github.com/apache/flink/pull/7111#issuecomment-439315975 Thanks for the review @zentol. Merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Harden resume from externalized checkpoint E2E test > --- > > Key: FLINK-10856 > URL: https://issues.apache.org/jira/browse/FLINK-10856 > Project: Flink > Issue Type: Bug > Components: E2E Tests, State Backends, Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The resume from externalized checkpoints E2E test can fail due to > FLINK-10855. We should harden the test script to not expect a single > checkpoint directory being present but to take the checkpoint with the > highest checkpoint counter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #7114: [BP-1.5][FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test
tillrohrmann closed pull request #7114: [BP-1.5][FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test URL: https://github.com/apache/flink/pull/7114 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 018e12d6df0..8b370582149 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -531,3 +531,10 @@ function clean_log_files { rm ${FLINK_DIR}/log/* echo "Deleted all files under ${FLINK_DIR}/log/" } + +function find_latest_completed_checkpoint { +local checkpoint_root_directory=$1 +# a completed checkpoint must contain the _metadata file +local checkpoint_meta_file=$(ls -d ${checkpoint_root_directory}/chk-[1-9]*/_metadata | sort -Vr | head -n1) +echo "$(dirname "${checkpoint_meta_file}")" +} diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index e0b97d6f6a8..f406784b4c5 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -113,7 +113,7 @@ else fi # take the latest checkpoint -CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]* | sort -Vr | head -n1) +CHECKPOINT_PATH=$(find_latest_completed_checkpoint ${CHECKPOINT_DIR}/${DATASTREAM_JOB}) if [ -z $CHECKPOINT_PATH ]; then echo "Expected an externalized checkpoint to be present, but none exists." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689159#comment-16689159 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#issuecomment-439317653 @pnowojski I updated the PR. Now we just matched against the version `universal`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible
twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#issuecomment-439317653 @pnowojski I updated the PR. Now we just matched against the version `universal`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ambition119 opened a new pull request #7120: fix up RowSerializer.copy occur ClassCastException
ambition119 opened a new pull request #7120: fix up RowSerializer.copy occur ClassCastException URL: https://github.com/apache/flink/pull/7120 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) I create jira [FLINK-10299](https://issues.apache.org/jira/browse/FLINK-10299) , this pull request fix up RowSerializer.copy occur ClassCastException. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions
[ https://issues.apache.org/jira/browse/FLINK-10908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xymaqingxiang updated FLINK-10908: -- Description: Add yarn.application.priority in YarnConfigOptions, changes the priority of the Flink YARN application. Allowed priority values are 5,4,3,2,1, respectively correspond to VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW. was:Add yarn.application.priority in YarnConfigOptions, Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW. > Add yarn.application.priority in YarnConfigOptions > -- > > Key: FLINK-10908 > URL: https://issues.apache.org/jira/browse/FLINK-10908 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Add yarn.application.priority in YarnConfigOptions, changes the priority of > the Flink YARN application. > Allowed priority values are 5,4,3,2,1, respectively correspond to VERY_HIGH, > HIGH, NORMAL, LOW, VERY_LOW. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #7115: [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out
tillrohrmann closed pull request #7115: [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out URL: https://github.com/apache/flink/pull/7115 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index cbf51037b8b..e3b501e52e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; @@ -427,7 +428,18 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { deploymentFuture.whenComplete( (Void ignored, Throwable failure) -> { if (failure != null) { - markFailed(ExceptionUtils.stripCompletionException(failure)); + final Throwable stripCompletionException = ExceptionUtils.stripCompletionException(failure); + final Throwable schedulingFailureCause; + + if (stripCompletionException instanceof TimeoutException) { + schedulingFailureCause = new NoResourceAvailableException( + "Could not allocate enough slots within timeout of " + allocationTimeout + " to run the job. " + + "Please make sure that the cluster has enough resources."); + } else { + schedulingFailureCause = stripCompletionException; + } + + markFailed(schedulingFailureCause); } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 3b55e009116..56315e07146 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException { final CompletableFuture schedulingJobVertexFuture = ejv.scheduleAll( slotProvider, allowQueuedScheduling, - LocationPreferenceConstraint.ALL,// since it is an input vertex, the input based location preferences should be empty + LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty Collections.emptySet()); schedulingFutures.add(schedulingJobVertexFuture); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index c108eaee36a..585e1badf47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -101,7 +101,42 @@ public void runJobWithMultipleRpcServices() throws Exception { } @Test - public void testHandleJobsWhenNotEnoughSlot() throws Exception { + public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception { + try { + setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER); + fail("Job should fail."); + }
[GitHub] tillrohrmann commented on issue #7115: [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out
tillrohrmann commented on issue #7115: [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out URL: https://github.com/apache/flink/pull/7115#issuecomment-439316401 Thanks for the review @zentol. Merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10856) Harden resume from externalized checkpoint E2E test
[ https://issues.apache.org/jira/browse/FLINK-10856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-10856. - Resolution: Fixed Fixed via master: https://github.com/apache/flink/commit/bddfe23d7df18099982958d21f7bcd01039909b9 1.7.0: https://github.com/apache/flink/commit/6c21335004ce487e104bc85bd3aae211e1319a20 1.6.3: https://github.com/apache/flink/commit/b4e6d9b03facaf572caddaed5a70afcbffd083a2 1.5.6: https://github.com/apache/flink/commit/0c27eda46ccf88ca794bb7831a788df303a5967e > Harden resume from externalized checkpoint E2E test > --- > > Key: FLINK-10856 > URL: https://issues.apache.org/jira/browse/FLINK-10856 > Project: Flink > Issue Type: Bug > Components: E2E Tests, State Backends, Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The resume from externalized checkpoints E2E test can fail due to > FLINK-10855. We should harden the test script to not expect a single > checkpoint directory being present but to take the checkpoint with the > highest checkpoint counter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #7117: [BP-1.6][FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out
tillrohrmann closed pull request #7117: [BP-1.6][FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out URL: https://github.com/apache/flink/pull/7117 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index cbf51037b8b..e3b501e52e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; @@ -427,7 +428,18 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { deploymentFuture.whenComplete( (Void ignored, Throwable failure) -> { if (failure != null) { - markFailed(ExceptionUtils.stripCompletionException(failure)); + final Throwable stripCompletionException = ExceptionUtils.stripCompletionException(failure); + final Throwable schedulingFailureCause; + + if (stripCompletionException instanceof TimeoutException) { + schedulingFailureCause = new NoResourceAvailableException( + "Could not allocate enough slots within timeout of " + allocationTimeout + " to run the job. " + + "Please make sure that the cluster has enough resources."); + } else { + schedulingFailureCause = stripCompletionException; + } + + markFailed(schedulingFailureCause); } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 3b55e009116..56315e07146 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException { final CompletableFuture schedulingJobVertexFuture = ejv.scheduleAll( slotProvider, allowQueuedScheduling, - LocationPreferenceConstraint.ALL,// since it is an input vertex, the input based location preferences should be empty + LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty Collections.emptySet()); schedulingFutures.add(schedulingJobVertexFuture); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann closed pull request #7116: [BP-1.7][FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out
tillrohrmann closed pull request #7116: [BP-1.7][FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out URL: https://github.com/apache/flink/pull/7116 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index cbf51037b8b..e3b501e52e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; @@ -427,7 +428,18 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { deploymentFuture.whenComplete( (Void ignored, Throwable failure) -> { if (failure != null) { - markFailed(ExceptionUtils.stripCompletionException(failure)); + final Throwable stripCompletionException = ExceptionUtils.stripCompletionException(failure); + final Throwable schedulingFailureCause; + + if (stripCompletionException instanceof TimeoutException) { + schedulingFailureCause = new NoResourceAvailableException( + "Could not allocate enough slots within timeout of " + allocationTimeout + " to run the job. " + + "Please make sure that the cluster has enough resources."); + } else { + schedulingFailureCause = stripCompletionException; + } + + markFailed(schedulingFailureCause); } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 3b55e009116..56315e07146 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException { final CompletableFuture schedulingJobVertexFuture = ejv.scheduleAll( slotProvider, allowQueuedScheduling, - LocationPreferenceConstraint.ALL,// since it is an input vertex, the input based location preferences should be empty + LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty Collections.emptySet()); schedulingFutures.add(schedulingJobVertexFuture); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index c108eaee36a..585e1badf47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -101,7 +101,42 @@ public void runJobWithMultipleRpcServices() throws Exception { } @Test - public void testHandleJobsWhenNotEnoughSlot() throws Exception { + public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception { + try { + setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER); + fail("Job should fail."); +
[jira] [Commented] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout
[ https://issues.apache.org/jira/browse/FLINK-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689153#comment-16689153 ] ASF GitHub Bot commented on FLINK-10883: tillrohrmann closed pull request #7115: [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out URL: https://github.com/apache/flink/pull/7115 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index cbf51037b8b..e3b501e52e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; @@ -427,7 +428,18 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { deploymentFuture.whenComplete( (Void ignored, Throwable failure) -> { if (failure != null) { - markFailed(ExceptionUtils.stripCompletionException(failure)); + final Throwable stripCompletionException = ExceptionUtils.stripCompletionException(failure); + final Throwable schedulingFailureCause; + + if (stripCompletionException instanceof TimeoutException) { + schedulingFailureCause = new NoResourceAvailableException( + "Could not allocate enough slots within timeout of " + allocationTimeout + " to run the job. " + + "Please make sure that the cluster has enough resources."); + } else { + schedulingFailureCause = stripCompletionException; + } + + markFailed(schedulingFailureCause); } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 3b55e009116..56315e07146 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException { final CompletableFuture schedulingJobVertexFuture = ejv.scheduleAll( slotProvider, allowQueuedScheduling, - LocationPreferenceConstraint.ALL,// since it is an input vertex, the input based location preferences should be empty + LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty Collections.emptySet()); schedulingFutures.add(schedulingJobVertexFuture); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index c108eaee36a..585e1badf47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -101,7 +101,42 @@ public void runJobWithMultipleRpcServices() throws Exception { } @Test - public void testHandleJobsWhenNotEnoughSlot() throws Exception { + public
[jira] [Resolved] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout
[ https://issues.apache.org/jira/browse/FLINK-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10883. --- Resolution: Fixed Fixed via master: https://github.com/apache/flink/commit/b1965eeb854f16f385329a546d47d684e2891d70 1.7.0: https://github.com/apache/flink/commit/f5db114940fad1b96b549801c80b312a9367a55a 1.6.3: https://github.com/apache/flink/commit/fd6c3eea8c8c9d371e2cdc5e98ce650be5b5fc37 > Submitting a jobs without enough slots times out due to a unspecified timeout > - > > Key: FLINK-10883 > URL: https://issues.apache.org/jira/browse/FLINK-10883 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.7.0 > > > When submitting a job without enough slots being available the job will stay > in a SCHEDULED/CREATED state. After some time (a few minutes) the job > execution will fail with the following timeout exception: > {code} > 2018-11-14 13:38:26,614 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Pending slot > request [SlotRequestId{d9c0c94b6b81eae406f3d6cb6150fee4}] timed out. > 2018-11-14 13:38:26,615 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN > DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at > main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) > (1/$java.util.concurrent.TimeoutException > at > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > That the job submission may time out is not documented, neither is which > timeout is responsible in the first place nor how/whether this can be > disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout
[ https://issues.apache.org/jira/browse/FLINK-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689151#comment-16689151 ] ASF GitHub Bot commented on FLINK-10883: tillrohrmann commented on issue #7115: [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out URL: https://github.com/apache/flink/pull/7115#issuecomment-439316401 Thanks for the review @zentol. Merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Submitting a jobs without enough slots times out due to a unspecified timeout > - > > Key: FLINK-10883 > URL: https://issues.apache.org/jira/browse/FLINK-10883 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.7.0 > > > When submitting a job without enough slots being available the job will stay > in a SCHEDULED/CREATED state. After some time (a few minutes) the job > execution will fail with the following timeout exception: > {code} > 2018-11-14 13:38:26,614 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Pending slot > request [SlotRequestId{d9c0c94b6b81eae406f3d6cb6150fee4}] timed out. > 2018-11-14 13:38:26,615 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN > DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at > main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) > (1/$java.util.concurrent.TimeoutException > at > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > That the job submission may time out is not documented, neither is which > timeout is responsible in the first place nor how/whether this can be > disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10856) Harden resume from externalized checkpoint E2E test
[ https://issues.apache.org/jira/browse/FLINK-10856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689149#comment-16689149 ] ASF GitHub Bot commented on FLINK-10856: tillrohrmann closed pull request #7111: [FLINK-10856] Find latest completed checkpoint for resume from externalized checkpoint e2e test URL: https://github.com/apache/flink/pull/7111 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index ea267538639..4e6254864c1 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -656,3 +656,10 @@ function wait_for_restart_to_complete { fi done } + +function find_latest_completed_checkpoint { +local checkpoint_root_directory=$1 +# a completed checkpoint must contain the _metadata file +local checkpoint_meta_file=$(ls -d ${checkpoint_root_directory}/chk-[1-9]*/_metadata | sort -Vr | head -n1) +echo "$(dirname "${checkpoint_meta_file}")" +} diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index c1477574d5d..35fe30b6b25 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -112,7 +112,7 @@ else fi # take the latest checkpoint -CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]* | sort -Vr | head -n1) +CHECKPOINT_PATH=$(find_latest_completed_checkpoint ${CHECKPOINT_DIR}/${DATASTREAM_JOB}) if [ -z $CHECKPOINT_PATH ]; then echo "Expected an externalized checkpoint to be present, but none exists." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Harden resume from externalized checkpoint E2E test > --- > > Key: FLINK-10856 > URL: https://issues.apache.org/jira/browse/FLINK-10856 > Project: Flink > Issue Type: Bug > Components: E2E Tests, State Backends, Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The resume from externalized checkpoints E2E test can fail due to > FLINK-10855. We should harden the test script to not expect a single > checkpoint directory being present but to take the checkpoint with the > highest checkpoint counter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions
xymaqingxiang created FLINK-10908: - Summary: Add yarn.application.priority in YarnConfigOptions Key: FLINK-10908 URL: https://issues.apache.org/jira/browse/FLINK-10908 Project: Flink Issue Type: Improvement Reporter: xymaqingxiang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol closed pull request #7119: [FLINK-10907] Fix Flink JobManager metrics from getting stuck after a job recovery.
zentol closed pull request #7119: [FLINK-10907] Fix Flink JobManager metrics from getting stuck after a job recovery. URL: https://github.com/apache/flink/pull/7119 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java index e09051d7160..f67b49d6745 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java @@ -61,16 +61,17 @@ public String hostname() { public JobManagerJobMetricGroup addJob(JobGraph job) { JobID jobId = job.getJobID(); String jobName = job.getName(); - // get or create a jobs metric group - JobManagerJobMetricGroup currentJobGroup; synchronized (this) { if (!isClosed()) { - currentJobGroup = jobs.get(jobId); + JobManagerJobMetricGroup currentJobGroup = jobs.get(jobId); - if (currentJobGroup == null || currentJobGroup.isClosed()) { - currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName); - jobs.put(jobId, currentJobGroup); + if (currentJobGroup != null) { + currentJobGroup.close(); } + + currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName); + jobs.put(jobId, currentJobGroup); + return currentJobGroup; } else { return null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index cb5ec67c97c..146fb3b1f45 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; /** @@ -58,13 +59,15 @@ public void addAndRemoveJobs() throws Exception { JobManagerJobMetricGroup jmJobGroup12 = group.addJob(new JobGraph(jid1, jobName1)); JobManagerJobMetricGroup jmJobGroup21 = group.addJob(new JobGraph(jid2, jobName2)); - assertEquals(jmJobGroup11, jmJobGroup12); + assertNotEquals(jmJobGroup11, jmJobGroup12); + assertTrue(jmJobGroup11.isClosed()); + assertTrue(!jmJobGroup12.isClosed()); assertEquals(2, group.numRegisteredJobMetricGroups()); group.removeJob(jid1); - assertTrue(jmJobGroup11.isClosed()); + assertTrue(jmJobGroup12.isClosed()); assertEquals(1, group.numRegisteredJobMetricGroups()); group.removeJob(jid2); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10907) Job recovery on the same JobManager causes JobManager metrics to report stale values
[ https://issues.apache.org/jira/browse/FLINK-10907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10907. Resolution: Won't Fix Not a problem in 1.5 and above. The JobManagerJobMetricGroup is closed when a {{JobMaster}} exits and is properly overwritten on recovery in {{JobManagerMetricGroup#addJob}}. > Job recovery on the same JobManager causes JobManager metrics to report stale > values > > > Key: FLINK-10907 > URL: https://issues.apache.org/jira/browse/FLINK-10907 > Project: Flink > Issue Type: Bug > Components: Core, Metrics >Affects Versions: 1.4.2 > Environment: Verified the bug and the fix running on Flink 1.4 > Based on the JobManagerMetricGroup.java code in master, this issue should > still occur on Flink versions after 1.4. >Reporter: Mark Cho >Priority: Minor > Labels: pull-request-available > > https://github.com/apache/flink/pull/7119 > * JobManager loses and regains leadership if it loses connection and > reconnects to ZooKeeper. > * When it regains the leadership, it tries to recover the job graph. > * During the recovery, it will try to reuse the existing > {{JobManagerMetricGroup}} to register new counters and gauges under the same > metric name, which causes the new counters and gauges to be registered > incorrectly. > * The old counters and gauges will continue to > report the stale values and the new counters and gauges will not report > the latest metric. > Relevant lines from logs > {code:java} > com.---.JobManager - Submitting recovered job > e9e49fd9b8c61cf54b435f39aa49923f. > com.---.JobManager - Submitting job e9e49fd9b8c61cf54b435f39aa49923f > (flink-job) (Recovery). > com.---.JobManager - Running initialization on master for job flink-job > (e9e49fd9b8c61cf54b435f39aa49923f). > com.---.JobManager - Successfully ran initialization on master in 0 ms. > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'totalNumberOfCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'numberOfFailedCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointDuration'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointExternalPath'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'restartingTime'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'downtime'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'uptime'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'fullRestarts'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'task_failures'. Metric will not be reported.[] > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10907) Job recovery on the same JobManager causes JobManager metrics to report stale values
[ https://issues.apache.org/jira/browse/FLINK-10907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689414#comment-16689414 ] ASF GitHub Bot commented on FLINK-10907: zentol closed pull request #7119: [FLINK-10907] Fix Flink JobManager metrics from getting stuck after a job recovery. URL: https://github.com/apache/flink/pull/7119 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java index e09051d7160..f67b49d6745 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java @@ -61,16 +61,17 @@ public String hostname() { public JobManagerJobMetricGroup addJob(JobGraph job) { JobID jobId = job.getJobID(); String jobName = job.getName(); - // get or create a jobs metric group - JobManagerJobMetricGroup currentJobGroup; synchronized (this) { if (!isClosed()) { - currentJobGroup = jobs.get(jobId); + JobManagerJobMetricGroup currentJobGroup = jobs.get(jobId); - if (currentJobGroup == null || currentJobGroup.isClosed()) { - currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName); - jobs.put(jobId, currentJobGroup); + if (currentJobGroup != null) { + currentJobGroup.close(); } + + currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName); + jobs.put(jobId, currentJobGroup); + return currentJobGroup; } else { return null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index cb5ec67c97c..146fb3b1f45 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; /** @@ -58,13 +59,15 @@ public void addAndRemoveJobs() throws Exception { JobManagerJobMetricGroup jmJobGroup12 = group.addJob(new JobGraph(jid1, jobName1)); JobManagerJobMetricGroup jmJobGroup21 = group.addJob(new JobGraph(jid2, jobName2)); - assertEquals(jmJobGroup11, jmJobGroup12); + assertNotEquals(jmJobGroup11, jmJobGroup12); + assertTrue(jmJobGroup11.isClosed()); + assertTrue(!jmJobGroup12.isClosed()); assertEquals(2, group.numRegisteredJobMetricGroups()); group.removeJob(jid1); - assertTrue(jmJobGroup11.isClosed()); + assertTrue(jmJobGroup12.isClosed()); assertEquals(1, group.numRegisteredJobMetricGroups()); group.removeJob(jid2); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job recovery on the same JobManager causes JobManager metrics to report stale > values > > > Key: FLINK-10907 > URL: https://issues.apache.org/jira/browse/FLINK-10907 > Project: Flink > Issue Type: Bug > Components: Core, Metrics >Affects Versions: 1.4.2 > Environment: Verified the bug and the fix running on Flink 1.4 > Based on the JobManagerMetricGroup.java code in master, this issue should > still occur on Flink versions after 1.4. >Reporter: Mark Cho >Priority: Minor > Labels: pull-request-available > >
[GitHub] zentol commented on issue #7119: [FLINK-10907] Fix Flink JobManager metrics from getting stuck after a job recovery.
zentol commented on issue #7119: [FLINK-10907] Fix Flink JobManager metrics from getting stuck after a job recovery. URL: https://github.com/apache/flink/pull/7119#issuecomment-439391428 Not a problem in 1.5 and above, see the JIRA for more details. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10907) Job recovery on the same JobManager causes JobManager metrics to report stale values
[ https://issues.apache.org/jira/browse/FLINK-10907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689413#comment-16689413 ] ASF GitHub Bot commented on FLINK-10907: zentol commented on issue #7119: [FLINK-10907] Fix Flink JobManager metrics from getting stuck after a job recovery. URL: https://github.com/apache/flink/pull/7119#issuecomment-439391428 Not a problem in 1.5 and above, see the JIRA for more details. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job recovery on the same JobManager causes JobManager metrics to report stale > values > > > Key: FLINK-10907 > URL: https://issues.apache.org/jira/browse/FLINK-10907 > Project: Flink > Issue Type: Bug > Components: Core, Metrics >Affects Versions: 1.4.2 > Environment: Verified the bug and the fix running on Flink 1.4 > Based on the JobManagerMetricGroup.java code in master, this issue should > still occur on Flink versions after 1.4. >Reporter: Mark Cho >Priority: Minor > Labels: pull-request-available > > https://github.com/apache/flink/pull/7119 > * JobManager loses and regains leadership if it loses connection and > reconnects to ZooKeeper. > * When it regains the leadership, it tries to recover the job graph. > * During the recovery, it will try to reuse the existing > {{JobManagerMetricGroup}} to register new counters and gauges under the same > metric name, which causes the new counters and gauges to be registered > incorrectly. > * The old counters and gauges will continue to > report the stale values and the new counters and gauges will not report > the latest metric. > Relevant lines from logs > {code:java} > com.---.JobManager - Submitting recovered job > e9e49fd9b8c61cf54b435f39aa49923f. > com.---.JobManager - Submitting job e9e49fd9b8c61cf54b435f39aa49923f > (flink-job) (Recovery). > com.---.JobManager - Running initialization on master for job flink-job > (e9e49fd9b8c61cf54b435f39aa49923f). > com.---.JobManager - Successfully ran initialization on master in 0 ms. > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'totalNumberOfCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'numberOfFailedCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointDuration'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointExternalPath'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'restartingTime'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'downtime'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'uptime'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'fullRestarts'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'task_failures'. Metric will not be reported.[] > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #7120: fix up RowSerializer.copy occur ClassCastException
zentol commented on issue #7120: fix up RowSerializer.copy occur ClassCastException URL: https://github.com/apache/flink/pull/7120#issuecomment-439391864 Please open a JIRA before opening a PR, and include a description as to when this issue can occur. Additionally we require a test to reproduce the issue and to ensure this issue isn't re-introduced later on. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol closed pull request #7120: fix up RowSerializer.copy occur ClassCastException
zentol closed pull request #7120: fix up RowSerializer.copy occur ClassCastException URL: https://github.com/apache/flink/pull/7120 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java index e29f6813254..5d3a9e50667 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java @@ -33,6 +33,8 @@ import java.io.IOException; import java.io.ObjectInputStream; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.Arrays; import java.util.List; @@ -92,7 +94,12 @@ public Row copy(Row from) { Row result = new Row(len); for (int i = 0; i < len; i++) { Object fromField = from.getField(i); - if (fromField != null) { + + String fromTypeName = fromField.getClass().getTypeName(); + Type fieldSerializerType = ((ParameterizedType) fieldSerializers[i].getClass().getGenericSuperclass()).getActualTypeArguments()[0]; + String fieldSerializerTypeName = fieldSerializerType.getTypeName(); + + if (fromField != null && fieldSerializerTypeName.equals(fromTypeName)) { Object copy = fieldSerializers[i].copy(fromField); result.setField(i, copy); } else { @@ -120,12 +127,20 @@ public Row copy(Row from, Row reuse) { Object fromField = from.getField(i); if (fromField != null) { Object reuseField = reuse.getField(i); - if (reuseField != null) { + + String fromTypeName = fromField.getClass().getTypeName(); + String reuseTypeName = reuseField.getClass().getTypeName(); + Type fieldSerializerType = ((ParameterizedType) fieldSerializers[i].getClass().getGenericSuperclass()).getActualTypeArguments()[0]; + String fieldSerializerTypeName = fieldSerializerType.getTypeName(); + + if (reuseField != null && fieldSerializerTypeName.equals(fromTypeName) && fromTypeName.equals(reuseTypeName)) { Object copy = fieldSerializers[i].copy(fromField, reuseField); reuse.setField(i, copy); - } else { + } else if (fieldSerializerTypeName.equals(fromTypeName)){ Object copy = fieldSerializers[i].copy(fromField); reuse.setField(i, copy); + } else { + reuse.setField(i, null); } } else { reuse.setField(i, null); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wujinhu opened a new pull request #7123: [FLINK-10865] Add Aliyun OSS file systems without Hadoop dependencies
wujinhu opened a new pull request #7123: [FLINK-10865] Add Aliyun OSS file systems without Hadoop dependencies URL: https://github.com/apache/flink/pull/7123 ## What is the purpose of the change This PR will add an implementation of a file system that read from & write to Aliyun OSS so that users can use OSS with Flink without depending on Hadoop. In this way, users will find it is more easily to use OSS with Flink now. This implementation wraps **AliyunOSSFileSystem** and shade its dependencies. However, the wrapped jar is not in Flink's lib directory, users need to copy the jar which built in **opt** directory to **lib** directory. ## Brief change log - Adds **flink-filesystems/flink-oss-fs-hadoop** ## Verifying this change This implementation adds some tests to test instantiation and some reads & writes & lists operations(test communications with Aliyun OSS). However, in order to run these tests, someone need to have his own Aliyun access key id and access key secret. Then, set environment variables below: `export ARTIFACTS_OSS_ENDPOINT=` `export ARTIFACTS_OSS_BUCKET=` `export ARTIFACTS_OSS_ACCESS_KEY=` `export ARTIFACTS_OSS_SECRET_KEY=` These tests are skipped by default. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10865) Implement Flink's own Aliyun OSS filesystem
[ https://issues.apache.org/jira/browse/FLINK-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10865: --- Labels: pull-request-available (was: ) > Implement Flink's own Aliyun OSS filesystem > --- > > Key: FLINK-10865 > URL: https://issues.apache.org/jira/browse/FLINK-10865 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Affects Versions: 1.6.2 >Reporter: wujinhu >Priority: Major > Labels: pull-request-available > > Aliyun OSS is widely used among China’s cloud users, and Hadoop supports > Aliyun OSS since 2.9.1. > Open this jira to wrap AliyunOSSFileSystem in flink(similar to s3 support), > so that user can read from & write to OSS more easily in flink. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10865) Implement Flink's own Aliyun OSS filesystem
[ https://issues.apache.org/jira/browse/FLINK-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689419#comment-16689419 ] ASF GitHub Bot commented on FLINK-10865: wujinhu opened a new pull request #7123: [FLINK-10865] Add Aliyun OSS file systems without Hadoop dependencies URL: https://github.com/apache/flink/pull/7123 ## What is the purpose of the change This PR will add an implementation of a file system that read from & write to Aliyun OSS so that users can use OSS with Flink without depending on Hadoop. In this way, users will find it is more easily to use OSS with Flink now. This implementation wraps **AliyunOSSFileSystem** and shade its dependencies. However, the wrapped jar is not in Flink's lib directory, users need to copy the jar which built in **opt** directory to **lib** directory. ## Brief change log - Adds **flink-filesystems/flink-oss-fs-hadoop** ## Verifying this change This implementation adds some tests to test instantiation and some reads & writes & lists operations(test communications with Aliyun OSS). However, in order to run these tests, someone need to have his own Aliyun access key id and access key secret. Then, set environment variables below: `export ARTIFACTS_OSS_ENDPOINT=` `export ARTIFACTS_OSS_BUCKET=` `export ARTIFACTS_OSS_ACCESS_KEY=` `export ARTIFACTS_OSS_SECRET_KEY=` These tests are skipped by default. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement Flink's own Aliyun OSS filesystem > --- > > Key: FLINK-10865 > URL: https://issues.apache.org/jira/browse/FLINK-10865 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Affects Versions: 1.6.2 >Reporter: wujinhu >Priority: Major > Labels: pull-request-available > > Aliyun OSS is widely used among China’s cloud users, and Hadoop supports > Aliyun OSS since 2.9.1. > Open this jira to wrap AliyunOSSFileSystem in flink(similar to s3 support), > so that user can read from & write to OSS more easily in flink. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions
[ https://issues.apache.org/jira/browse/FLINK-10908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xymaqingxiang updated FLINK-10908: -- Description: Add yarn.application.priority in YarnConfigOptions, Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW. (was: Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW) > Add yarn.application.priority in YarnConfigOptions > -- > > Key: FLINK-10908 > URL: https://issues.apache.org/jira/browse/FLINK-10908 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Add yarn.application.priority in YarnConfigOptions, Changes the priority of > the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions
[ https://issues.apache.org/jira/browse/FLINK-10908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689246#comment-16689246 ] ASF GitHub Bot commented on FLINK-10908: maqingxiang opened a new pull request #7122: [FLINK-10908][flink-yarn]Add yarn.application.priority in YarnConfigOptions URL: https://github.com/apache/flink/pull/7122 ## What is the purpose of the change Add the priority of the Flink YARN application. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add yarn.application.priority in YarnConfigOptions > -- > > Key: FLINK-10908 > URL: https://issues.apache.org/jira/browse/FLINK-10908 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Add yarn.application.priority in YarnConfigOptions, Changes the priority of > the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] maqingxiang opened a new pull request #7122: [FLINK-10908][flink-yarn]Add yarn.application.priority in YarnConfigOptions
maqingxiang opened a new pull request #7122: [FLINK-10908][flink-yarn]Add yarn.application.priority in YarnConfigOptions URL: https://github.com/apache/flink/pull/7122 ## What is the purpose of the change Add the priority of the Flink YARN application. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions
[ https://issues.apache.org/jira/browse/FLINK-10908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10908: --- Labels: pull-request-available (was: ) > Add yarn.application.priority in YarnConfigOptions > -- > > Key: FLINK-10908 > URL: https://issues.apache.org/jira/browse/FLINK-10908 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Add yarn.application.priority in YarnConfigOptions, Changes the priority of > the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10908) Add yarn.application.priority in YarnConfigOptions
[ https://issues.apache.org/jira/browse/FLINK-10908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xymaqingxiang updated FLINK-10908: -- Description: Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW > Add yarn.application.priority in YarnConfigOptions > -- > > Key: FLINK-10908 > URL: https://issues.apache.org/jira/browse/FLINK-10908 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Priority: Major > > > Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, > NORMAL, LOW, VERY_LOW -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10909) Possible RocksDBStateBackend performance regression with rocksdbjni-5.7.5
Mohamed Amine ABDESSEMED created FLINK-10909: Summary: Possible RocksDBStateBackend performance regression with rocksdbjni-5.7.5 Key: FLINK-10909 URL: https://issues.apache.org/jira/browse/FLINK-10909 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.6.2 Reporter: Mohamed Amine ABDESSEMED Hello, We suspect RocksDBStateBackend performance regression after upgrading to flink-1.6.1(rocksdbjni-5.7.5) similar to https://issues.apache.org/jira/browse/FLINK-5756 and [https://github.com/facebook/rocksdb/issues/1988] Here are the test results using the test code provided by [~StephanEwen] in [https://github.com/facebook/rocksdb/issues/1988] Test results with rocksdbjni-5.7.5: begin insert end insert - duration: *54 ms* end get - duration: *37247 ms* Test results with frocksdbjni-4.11.2-artisans: begin insert end insert - duration: *53 ms* end get - duration: *4 ms* Can anyone confirm ? Regards, Amine -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10909) Possible RocksDBStateBackend performance regression with rocksdbjni-5.7.5
[ https://issues.apache.org/jira/browse/FLINK-10909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689332#comment-16689332 ] Stefan Richter commented on FLINK-10909: The test you mentioned is part of the flink test suite, specifically it became {{RocksDBPerformanceTest#testRocksDbMergePerformance}}. This test is passing and there is no sign of performance regression in my opinion. > Possible RocksDBStateBackend performance regression with rocksdbjni-5.7.5 > - > > Key: FLINK-10909 > URL: https://issues.apache.org/jira/browse/FLINK-10909 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.6.2 >Reporter: Mohamed Amine ABDESSEMED >Priority: Major > > Hello, > We suspect RocksDBStateBackend performance regression after upgrading to > flink-1.6.1(rocksdbjni-5.7.5) > similar to https://issues.apache.org/jira/browse/FLINK-5756 and > [https://github.com/facebook/rocksdb/issues/1988] > Here are the test results using the test code provided by [~StephanEwen] in > [https://github.com/facebook/rocksdb/issues/1988] > Test results with rocksdbjni-5.7.5: > begin insert > end insert - duration: *54 ms* > end get - duration: *37247 ms* > Test results with frocksdbjni-4.11.2-artisans: > begin insert > end insert - duration: *53 ms* > end get - duration: *4 ms* > Can anyone confirm ? > Regards, > Amine -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6900: [FLINK-10642][table] fix CodeGen split fields errors in special config
asfgit closed pull request #6900: [FLINK-10642][table] fix CodeGen split fields errors in special config URL: https://github.com/apache/flink/pull/6900 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 19e030b4c0a..8f9913bdfaa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -1057,13 +1057,13 @@ abstract class CodeGenerator( // declaration val resultTypeTerm = primitiveTypeTermForTypeInfo(expr.resultType) -if (nullCheck) { +if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && !expr.nullTerm.equals(ALWAYS_NULL)) { reusableMemberStatements.add(s"private boolean ${expr.nullTerm};") } reusableMemberStatements.add(s"private $resultTypeTerm ${expr.resultTerm};") // assignment -if (nullCheck) { +if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && !expr.nullTerm.equals(ALWAYS_NULL)) { reusablePerRecordStatements.add(s"this.${expr.nullTerm} = ${expr.nullTerm};") } reusablePerRecordStatements.add(s"this.${expr.resultTerm} = ${expr.resultTerm};") diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala index b161eeda43c..4e880eeaf3b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala @@ -578,6 +578,22 @@ class CalcITCase( val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n") TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testSplitFeildsOnCustomType(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) +tEnv.getConfig.setMaxGeneratedCodeLength(1) // splits fields + +val ds = CollectionDataSets.getCustomTypeDataSet(env) +val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's) + .filter( 's.like("%a%") && 's.charLength > 12) + .select('i, 'l, 's.charLength) + +val expected = "3,3,25\n" + "3,5,14\n" +val results = filterDs.toDataSet[Row].collect() +TestBaseUtils.compareResultAsText(results.asJava, expected) + } } object CalcITCase { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10642) CodeGen split fields errors when maxGeneratedCodeLength equals 1
[ https://issues.apache.org/jira/browse/FLINK-10642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689453#comment-16689453 ] ASF GitHub Bot commented on FLINK-10642: asfgit closed pull request #6900: [FLINK-10642][table] fix CodeGen split fields errors in special config URL: https://github.com/apache/flink/pull/6900 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 19e030b4c0a..8f9913bdfaa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -1057,13 +1057,13 @@ abstract class CodeGenerator( // declaration val resultTypeTerm = primitiveTypeTermForTypeInfo(expr.resultType) -if (nullCheck) { +if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && !expr.nullTerm.equals(ALWAYS_NULL)) { reusableMemberStatements.add(s"private boolean ${expr.nullTerm};") } reusableMemberStatements.add(s"private $resultTypeTerm ${expr.resultTerm};") // assignment -if (nullCheck) { +if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && !expr.nullTerm.equals(ALWAYS_NULL)) { reusablePerRecordStatements.add(s"this.${expr.nullTerm} = ${expr.nullTerm};") } reusablePerRecordStatements.add(s"this.${expr.resultTerm} = ${expr.resultTerm};") diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala index b161eeda43c..4e880eeaf3b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala @@ -578,6 +578,22 @@ class CalcITCase( val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n") TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testSplitFeildsOnCustomType(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) +tEnv.getConfig.setMaxGeneratedCodeLength(1) // splits fields + +val ds = CollectionDataSets.getCustomTypeDataSet(env) +val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's) + .filter( 's.like("%a%") && 's.charLength > 12) + .select('i, 'l, 's.charLength) + +val expected = "3,3,25\n" + "3,5,14\n" +val results = filterDs.toDataSet[Row].collect() +TestBaseUtils.compareResultAsText(results.asJava, expected) + } } object CalcITCase { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > CodeGen split fields errors when maxGeneratedCodeLength equals 1 > > > Key: FLINK-10642 > URL: https://issues.apache.org/jira/browse/FLINK-10642 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.1 >Reporter: xueyu >Assignee: xueyu >Priority: Major > Labels: pull-request-available > > Several tests error in special config when setting maxGeneratedCodeLength 1. > e.g. > CalcITCase.testFilterOnCustomType:260 ? InvalidProgram Table program cannot > be... > JavaTableEnvironmentITCase.testAsFromAndToPojo:394 ? InvalidProgram Table > prog... > JavaTableEnvironmentITCase.testAsFromAndToPrivateFieldPojo:421 ? > InvalidProgram > JavaTableEnvironmentITCase.testAsFromPojo:288 ? InvalidProgram Table > program c... > JavaTableEnvironmentITCase.testAsFromPrivateFieldsPojo:366 ? InvalidProgram > Ta... > JavaTableEnvironmentITCase.testAsWithPojoAndGenericTypes:453 ? > InvalidProgram ... > TimeAttributesITCase.testPojoSupport:566 ? JobExecution Job execution > failed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10642) CodeGen split fields errors when maxGeneratedCodeLength equals 1
[ https://issues.apache.org/jira/browse/FLINK-10642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-10642. -- Resolution: Fixed Fix Version/s: 1.7.0 1.6.3 1.5.6 Fixed in master: a7ae4253e4463ff6242f69d8fdb1c4a76af0f14c Fixed in 1.7.0: 146f4340ed02cd2337c42b1480fd9c8b070997ad Fixed in 1.6.3: e8fc37e8aab27b5ee468f444d4c9f13927552166 FIxed in 1.5.6: 1d8bc86d152cde2941750c99f6cfd2507c96afdb > CodeGen split fields errors when maxGeneratedCodeLength equals 1 > > > Key: FLINK-10642 > URL: https://issues.apache.org/jira/browse/FLINK-10642 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.1 >Reporter: xueyu >Assignee: xueyu >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Several tests error in special config when setting maxGeneratedCodeLength 1. > e.g. > CalcITCase.testFilterOnCustomType:260 ? InvalidProgram Table program cannot > be... > JavaTableEnvironmentITCase.testAsFromAndToPojo:394 ? InvalidProgram Table > prog... > JavaTableEnvironmentITCase.testAsFromAndToPrivateFieldPojo:421 ? > InvalidProgram > JavaTableEnvironmentITCase.testAsFromPojo:288 ? InvalidProgram Table > program c... > JavaTableEnvironmentITCase.testAsFromPrivateFieldsPojo:366 ? InvalidProgram > Ta... > JavaTableEnvironmentITCase.testAsWithPojoAndGenericTypes:453 ? > InvalidProgram ... > TimeAttributesITCase.testPojoSupport:566 ? JobExecution Job execution > failed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr closed pull request #3830: [hotfix] [table] Optimize aggregate function get type
twalthr closed pull request #3830: [hotfix] [table] Optimize aggregate function get type URL: https://github.com/apache/flink/pull/3830 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr closed pull request #3569: [FLINK-6036] [table] Let catalog support partition
twalthr closed pull request #3569: [FLINK-6036] [table] Let catalog support partition URL: https://github.com/apache/flink/pull/3569 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala index 760cf7588f4..1b07fd8998d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala @@ -18,7 +18,9 @@ package org.apache.flink.table.api +import com.google.common.base.Joiner import org.apache.flink.table.catalog.TableSourceConverter +import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec /** * Exception for all errors occurring during expression parsing. @@ -74,6 +76,50 @@ object ValidationException { */ case class UnresolvedException(msg: String) extends RuntimeException(msg) +/** + * Exception for an operation on a nonexistent partition + * + * @param dbdatabase name + * @param table table name + * @param partitionSpec partition spec + * @param cause the cause + */ +case class PartitionNotExistException( +db: String, +table: String, +partitionSpec: PartitionSpec, +cause: Throwable) +extends RuntimeException( + s"Partition [${Joiner.on(",").withKeyValueSeparator("=").join(partitionSpec)}] " + + s"does not exist in table $db.$table!", cause) { + + def this(db: String, table: String, partitionSpec: PartitionSpec) = +this(db, table, partitionSpec, null) + +} + +/** + * Exception for adding an already existent partition + * + * @param dbdatabase name + * @param table table name + * @param partitionSpec partition spec + * @param cause the cause + */ +case class PartitionAlreadyExistException( +db: String, +table: String, +partitionSpec: PartitionSpec, +cause: Throwable) +extends RuntimeException( + s"Partition [${Joiner.on(",").withKeyValueSeparator("=").join(partitionSpec)}] " + + s"already exists in table $db.$table!", cause) { + + def this(db: String, table: String, partitionSpec: PartitionSpec) = +this(db, table, partitionSpec, null) + +} + /** * Exception for an operation on a nonexistent table * diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala index fcefa45fcc5..78798ce0b72 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala @@ -19,12 +19,81 @@ package org.apache.flink.table.catalog import org.apache.flink.table.api._ +import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec /** * The CrudExternalCatalog provides methods to create, drop, and alter databases or tables. */ trait CrudExternalCatalog extends ExternalCatalog { + /** +* Adds a partition to the catalog. +* +* @param dbName The name of the table's database. +* @param tableName The name of the table. +* @param part Description of the partition to add. +* @param ignoreIfExists Flag to specify behavior if a partition with the given spec +* already exists: +* if set to false, it throws a PartitionAlreadyExistException, +* if set to true, nothing happens. +* @throws DatabaseNotExistException thrown if the database does not exist in the catalog. +* @throws TableNotExistException thrown if the table does not exist in the catalog. +* @throws PartitionAlreadyExistException thrown if the partition already exists and +*ignoreIfExists is false +*/ + @throws[DatabaseNotExistException] + @throws[TableNotExistException] + @throws[PartitionAlreadyExistException] + def createPartition( + dbName: String, + tableName: String, + part: ExternalCatalogTablePartition, + ignoreIfExists: Boolean): Unit + + /** +* Deletes partition from a database of the catalog. +* +* @param dbNameThe name of the table's database. +* @param tableName The name of the table. +* @param partSpec Description of the partition to add. +* @param ignoreIfNotExists Flag to specify behavior if the partition does not exist: +*
[GitHub] tzulitai opened a new pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution
tzulitai opened a new pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124 ## What is the purpose of the change This PR adds a new "State Schema Evolution" page under "State & Fault Tolerance". It also reworks the "Custom State Serialization" page to reflect the new serializer / serializer snapshot abstractions in 1.7. - The "State Schema Evolution" page is intended for the majority of users who do not use custom serializers, and just care about what state types they should use if they care about evolvable schema, and their limitations. The list of supported types only includes Avro now, because we only support Avro schema evolution in 1.7. - The "Custom State Serialization" page is intended for power users who implement their own state serializer. It explains the abstractions and how Flink interacts with them. The document is also targeted for Flink developers who might implement Flink-shipped serializers. ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-6036) Let catalog support partition
[ https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-6036: -- Labels: pull-request-available (was: ) > Let catalog support partition > - > > Key: FLINK-6036 > URL: https://issues.apache.org/jira/browse/FLINK-6036 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: jingzhang >Assignee: jingzhang >Priority: Major > Labels: pull-request-available > > Now catalog only support CRUD at database and table level. But in some kind > of catalog, for example for hive, we also need do CRUD operations at > partition level. > This issue aims to let catalog support partition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9574: -- Labels: pull-request-available (was: ) > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689502#comment-16689502 ] ASF GitHub Bot commented on FLINK-9574: --- tzulitai opened a new pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124 ## What is the purpose of the change This PR adds a new "State Schema Evolution" page under "State & Fault Tolerance". It also reworks the "Custom State Serialization" page to reflect the new serializer / serializer snapshot abstractions in 1.7. - The "State Schema Evolution" page is intended for the majority of users who do not use custom serializers, and just care about what state types they should use if they care about evolvable schema, and their limitations. The list of supported types only includes Avro now, because we only support Avro schema evolution in 1.7. - The "Custom State Serialization" page is intended for power users who implement their own state serializer. It explains the abstractions and how Flink interacts with them. The document is also targeted for Flink developers who might implement Flink-shipped serializers. ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6457) Clean up ScalarFunction and TableFunction interface
[ https://issues.apache.org/jira/browse/FLINK-6457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689515#comment-16689515 ] ASF GitHub Bot commented on FLINK-6457: --- twalthr closed pull request #3880: [FLINK-6457] [table] Clean up ScalarFunction and TableFunction interface URL: https://github.com/apache/flink/pull/3880 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 9f50f0c806a..1aab7ae0993 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -68,6 +68,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable.HashMap import _root_.scala.annotation.varargs +import _root_.scala.util.{Try, Success, Failure} /** * The abstract base class for batch and stream TableEnvironments. @@ -340,10 +341,11 @@ abstract class TableEnvironment(val config: TableConfig) { // check if class could be instantiated checkForInstantiation(function.getClass) -val typeInfo: TypeInformation[_] = if (function.getResultType != null) { - function.getResultType -} else { - implicitly[TypeInformation[T]] +val typeInfo: TypeInformation[_] = Try { + function.getClass.getDeclaredMethod("getResultType") +} match { + case Success(m) => m.invoke(function).asInstanceOf[TypeInformation[_]] + case Failure(_) => implicitly[TypeInformation[T]] } // register in Table API diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala new file mode 100644 index 000..d781e76071d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.table.expressions.{Expression, ScalarFunctionCall} +import org.apache.flink.table.functions.ScalarFunction + +/** + * Holds methods to convert a [[ScalarFunction]] call + * in the Scala Table API into a [[ScalarFunctionCall]]. + * + * @param sf The ScalarFunction to convert. + */ +class ScalarFunctionConversions(sf: ScalarFunction) { + /** +* Creates a [[ScalarFunctionCall]] in Scala Table API. +* +* @param params actual parameters of function +* @return [[Expression]] in form of a [[ScalarFunctionCall]] +*/ + final def apply(params: Expression*): Expression = { +ScalarFunctionCall(sf, params) + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala index 692876fe2da..292e241077a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala @@ -24,6 +24,8 @@ import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.plan.logical.LogicalTableFunctionCall +import scala.util.{Failure, Success, Try} + /** * Holds methods to convert a [[TableFunction]] call in the Scala Table API into a [[Table]]. * @@ -39,7 +41,12 @@ class TableFunctionConversions[T](tf: TableFunction[T]) { */ final def apply(args:
[jira] [Commented] (FLINK-10900) Mark Kafka 2.0 connector as beta feature
[ https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689568#comment-16689568 ] ASF GitHub Bot commented on FLINK-10900: pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta URL: https://github.com/apache/flink/pull/7109#discussion_r234248328 ## File path: docs/dev/connectors/kafka.md ## @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not part of the binary distribu ## Kafka 1.0.0+ Connector -Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. Rather, it tracks the latest version of Kafka at the time of the Flink release. +Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. +Rather, it tracks the latest version of Kafka at the time of the Flink release. -If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. +If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. +If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. ### Compatibility -The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. The modern Kafka connector is compatible with broker versions 0.11.0 or later, depending on the features used. For details on Kafka compatibility, please refer to the [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility). +The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. Review comment: I was just braking new lines here, but I can fix this while I'm at it. Thanks for pointing out. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mark Kafka 2.0 connector as beta feature > > > Key: FLINK-10900 > URL: https://issues.apache.org/jira/browse/FLINK-10900 > Project: Flink > Issue Type: Task > Components: Kafka Connector >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Piotr Nowojski >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > > Given the test problems with the Kafka 2.0 connector we should mark this > connector as a beta feature until we have fully understood why so many tests > deadlock. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10900) Mark Kafka 2.0 connector as beta feature
[ https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689569#comment-16689569 ] ASF GitHub Bot commented on FLINK-10900: pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta URL: https://github.com/apache/flink/pull/7109#discussion_r234248328 ## File path: docs/dev/connectors/kafka.md ## @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not part of the binary distribu ## Kafka 1.0.0+ Connector -Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. Rather, it tracks the latest version of Kafka at the time of the Flink release. +Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. +Rather, it tracks the latest version of Kafka at the time of the Flink release. -If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. +If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. +If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. ### Compatibility -The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. The modern Kafka connector is compatible with broker versions 0.11.0 or later, depending on the features used. For details on Kafka compatibility, please refer to the [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility). +The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. Review comment: I was just brakinglines here, but I can fix this while I'm at it. Thanks for pointing out. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mark Kafka 2.0 connector as beta feature > > > Key: FLINK-10900 > URL: https://issues.apache.org/jira/browse/FLINK-10900 > Project: Flink > Issue Type: Task > Components: Kafka Connector >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Piotr Nowojski >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > > Given the test problems with the Kafka 2.0 connector we should mark this > connector as a beta feature until we have fully understood why so many tests > deadlock. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible
twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#issuecomment-439437076 Thanks @pnowojski. Will merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689586#comment-16689586 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#issuecomment-439437076 Thanks @pnowojski. Will merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10880) Failover strategies should not be applied to Batch Execution
[ https://issues.apache.org/jira/browse/FLINK-10880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689606#comment-16689606 ] TisonKun commented on FLINK-10880: -- [~till.rohrmann] alright I agree your quick fix and hope we can meet the release asap :-) > Failover strategies should not be applied to Batch Execution > > > Key: FLINK-10880 > URL: https://issues.apache.org/jira/browse/FLINK-10880 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.6.2 >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.6.3, 1.7.0 > > > When configuring a failover strategy other than "full", DataSet/Batch > execution is currently not correct. > This is expected, the failover region strategy is an experimental WIP feature > for streaming that has not been extended to the DataSet API. > We need to document this and prevent execution of DataSet features with other > failover strategies than "full". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10872) Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
[ https://issues.apache.org/jira/browse/FLINK-10872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689607#comment-16689607 ] ASF GitHub Bot commented on FLINK-10872: asfgit closed pull request #7100: [FLINK-10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11 URL: https://github.com/apache/flink/pull/7100 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index 6b02aba4bb3..db1fca6df49 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -191,14 +191,13 @@ under the License. sql-jar jar - + org.apache.flink flink-connector-elasticsearch6_${scala.binary.version} diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 6d4560864f8..9d51d03c7e4 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -143,6 +143,7 @@ run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scrip run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh" run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh" +run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh" run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh" run_test "Heavy deployment end-to-end test" "$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh" diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh new file mode 100755 index 000..8d3af0f7925 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +set -Eeuo pipefail + +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.11 0.11.0.2 3.2.0 3.2 "kafka-0.11" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11 > > > Key: FLINK-10872 > URL: https://issues.apache.org/jira/browse/FLINK-10872 > Project: Flink > Issue Type: Test > Components: E2E Tests >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #7100: [FLINK-10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
asfgit closed pull request #7100: [FLINK-10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11 URL: https://github.com/apache/flink/pull/7100 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index 6b02aba4bb3..db1fca6df49 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -191,14 +191,13 @@ under the License. sql-jar jar - + org.apache.flink flink-connector-elasticsearch6_${scala.binary.version} diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 6d4560864f8..9d51d03c7e4 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -143,6 +143,7 @@ run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scrip run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh" run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh" +run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh" run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh" run_test "Heavy deployment end-to-end test" "$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh" diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh new file mode 100755 index 000..8d3af0f7925 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka011.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +set -Eeuo pipefail + +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.11 0.11.0.2 3.2.0 3.2 "kafka-0.11" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures
[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689665#comment-16689665 ] ASF GitHub Bot commented on FLINK-10455: pnowojski commented on issue #7108: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/7108#issuecomment-439457861 Thanks @azagrebin merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Potential Kafka producer leak in case of failures > - > > Key: FLINK-10455 > URL: https://issues.apache.org/jira/browse/FLINK-10455 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.2 >Reporter: Nico Kruber >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > If the Kafka brokers' timeout is too low for our checkpoint interval [1], we > may get an {{ProducerFencedException}}. Documentation around > {{ProducerFencedException}} explicitly states that we should close the > producer after encountering it. > By looking at the code, it doesn't seem like this is actually done in > {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in > {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an > exception, we don't clean up (nor try to commit) any other transaction. > -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} > simply iterates over the {{pendingCommitTransactions}} which is not touched > during {{close()}} > Now if we restart the failing job on the same Flink cluster, any resources > from the previous attempt will still linger around. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #7108: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination
pnowojski commented on issue #7108: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/7108#issuecomment-439457861 Thanks @azagrebin merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures
[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689664#comment-16689664 ] ASF GitHub Bot commented on FLINK-10455: pnowojski commented on issue #7107: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/7107#issuecomment-439457778 Thanks @azagrebin, merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Potential Kafka producer leak in case of failures > - > > Key: FLINK-10455 > URL: https://issues.apache.org/jira/browse/FLINK-10455 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.2 >Reporter: Nico Kruber >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > If the Kafka brokers' timeout is too low for our checkpoint interval [1], we > may get an {{ProducerFencedException}}. Documentation around > {{ProducerFencedException}} explicitly states that we should close the > producer after encountering it. > By looking at the code, it doesn't seem like this is actually done in > {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in > {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an > exception, we don't clean up (nor try to commit) any other transaction. > -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} > simply iterates over the {{pendingCommitTransactions}} which is not touched > during {{close()}} > Now if we restart the failing job on the same Flink cluster, any resources > from the previous attempt will still linger around. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #7129: [FLINK-10880] Exclude JobManagerOptions#EXECUTION_FAILOVER_STRATEGY from documentation
tillrohrmann commented on a change in pull request #7129: [FLINK-10880] Exclude JobManagerOptions#EXECUTION_FAILOVER_STRATEGY from documentation URL: https://github.com/apache/flink/pull/7129#discussion_r234281426 ## File path: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ## @@ -106,6 +106,7 @@ /** * This option specifies the failover strategy, i.e. how the job computation recovers from task failures. */ + @Documentation.ExcludeFromDocumentation Review comment: Good idea. Will change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Reopened] (FLINK-10907) Job recovery on the same JobManager causes JobManager metrics to report stale values
[ https://issues.apache.org/jira/browse/FLINK-10907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-10907: -- The issue may affect the legacy mode in 1.5/1.6 . > Job recovery on the same JobManager causes JobManager metrics to report stale > values > > > Key: FLINK-10907 > URL: https://issues.apache.org/jira/browse/FLINK-10907 > Project: Flink > Issue Type: Bug > Components: Core, Metrics >Affects Versions: 1.4.2 > Environment: Verified the bug and the fix running on Flink 1.4 > Based on the JobManagerMetricGroup.java code in master, this issue should > still occur on Flink versions after 1.4. >Reporter: Mark Cho >Priority: Minor > Labels: pull-request-available > > https://github.com/apache/flink/pull/7119 > * JobManager loses and regains leadership if it loses connection and > reconnects to ZooKeeper. > * When it regains the leadership, it tries to recover the job graph. > * During the recovery, it will try to reuse the existing > {{JobManagerMetricGroup}} to register new counters and gauges under the same > metric name, which causes the new counters and gauges to be registered > incorrectly. > * The old counters and gauges will continue to > report the stale values and the new counters and gauges will not report > the latest metric. > Relevant lines from logs > {code:java} > com.---.JobManager - Submitting recovered job > e9e49fd9b8c61cf54b435f39aa49923f. > com.---.JobManager - Submitting job e9e49fd9b8c61cf54b435f39aa49923f > (flink-job) (Recovery). > com.---.JobManager - Running initialization on master for job flink-job > (e9e49fd9b8c61cf54b435f39aa49923f). > com.---.JobManager - Successfully ran initialization on master in 0 ms. > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'totalNumberOfCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'numberOfFailedCheckpoints'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointDuration'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'lastCheckpointExternalPath'. Metric will not be > reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'restartingTime'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'downtime'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'uptime'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'fullRestarts'. Metric will not be reported.[] > org.apache.flink.metrics.MetricGroup - Name collision: Group already contains > a Metric with the name 'task_failures'. Metric will not be reported.[] > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10869) Update S3 testing settings
[ https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689517#comment-16689517 ] ASF GitHub Bot commented on FLINK-10869: zentol commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#discussion_r234231999 ## File path: flink-end-to-end-tests/test-scripts/common_s3.sh ## @@ -68,8 +68,8 @@ function s3_setup { trap s3_cleanup EXIT cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/ - echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" Review comment: let's either change these to use the variables defined at the top (`AWS_REGION`, `AWS_ACCESS_KEY`, `AWS_SECRET_KEY`), or remove said variables since they are unused. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update S3 testing settings > -- > > Key: FLINK-10869 > URL: https://issues.apache.org/jira/browse/FLINK-10869 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > > Currently S3 tests go against a bucket hosted by 'data Artisans'. > As part of reworking the AWS permission setup, we need to adapt the > credentials and buckets for these tests. > Future tests should refer to the following environment variables for S3 tests: > * `IT_CASE_S3_BUCKET` > * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA` > * `IT_CASE_S3_SECRET_KEY` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.
zentol commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#discussion_r234231999 ## File path: flink-end-to-end-tests/test-scripts/common_s3.sh ## @@ -68,8 +68,8 @@ function s3_setup { trap s3_cleanup EXIT cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/ - echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" Review comment: let's either change these to use the variables defined at the top (`AWS_REGION`, `AWS_ACCESS_KEY`, `AWS_SECRET_KEY`), or remove said variables since they are unused. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.
zentol commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#discussion_r234230495 ## File path: flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java ## @@ -87,7 +79,8 @@ public static void cleanUp() throws IOException { @Test(expected = UnsupportedOperationException.class) public void requestingRecoverableWriterShouldThroughException() throws Exception { - FlinkS3FileSystem fileSystem = (FlinkS3FileSystem) FileSystem.get(basePath.toUri()); + URI s3Uri = URI.create(S3TestCredentials.getTestBucketUri()); Review comment: This is not using `TEST_DATA_DIR` anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10869) Update S3 testing settings
[ https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689528#comment-16689528 ] ASF GitHub Bot commented on FLINK-10869: StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#discussion_r234236294 ## File path: flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java ## @@ -87,7 +79,8 @@ public static void cleanUp() throws IOException { @Test(expected = UnsupportedOperationException.class) public void requestingRecoverableWriterShouldThroughException() throws Exception { - FlinkS3FileSystem fileSystem = (FlinkS3FileSystem) FileSystem.get(basePath.toUri()); + URI s3Uri = URI.create(S3TestCredentials.getTestBucketUri()); Review comment: It never used the `TEST_DATA_DIR` - the `getFileSystem()` only uses scheme and authority, path does not matter. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update S3 testing settings > -- > > Key: FLINK-10869 > URL: https://issues.apache.org/jira/browse/FLINK-10869 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > > Currently S3 tests go against a bucket hosted by 'data Artisans'. > As part of reworking the AWS permission setup, we need to adapt the > credentials and buckets for these tests. > Future tests should refer to the following environment variables for S3 tests: > * `IT_CASE_S3_BUCKET` > * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA` > * `IT_CASE_S3_SECRET_KEY` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10906) docker-entrypoint.sh logs credentails during startup
[ https://issues.apache.org/jira/browse/FLINK-10906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689574#comment-16689574 ] ASF GitHub Bot commented on FLINK-10906: tillrohrmann opened a new pull request #7125: [FLINK-10906][docker] Don't print configuration in docker-entrypoint.sh URL: https://github.com/apache/flink/pull/7125 ## What is the purpose of the change In order to not leak secrets we should not print the configuration in docker-entrypoint.sh. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > docker-entrypoint.sh logs credentails during startup > > > Key: FLINK-10906 > URL: https://issues.apache.org/jira/browse/FLINK-10906 > Project: Flink > Issue Type: Improvement > Components: Docker >Reporter: Konstantin Knauf >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > > {{docker-entrypoint.sh}} prints the full flink-configuration file including > potentially sensitive configuration entries. > To be consistent we should hide these as in {{GlobalConfiguration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann opened a new pull request #7125: [FLINK-10906][docker] Don't print configuration in docker-entrypoint.sh
tillrohrmann opened a new pull request #7125: [FLINK-10906][docker] Don't print configuration in docker-entrypoint.sh URL: https://github.com/apache/flink/pull/7125 ## What is the purpose of the change In order to not leak secrets we should not print the configuration in docker-entrypoint.sh. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10900) Mark Kafka 2.0 connector as beta feature
[ https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-10900. -- Resolution: Fixed merged commit 1680132 into apache:master commits 146f4340ed..d02af7eda3 into apache:release-1.7 > Mark Kafka 2.0 connector as beta feature > > > Key: FLINK-10900 > URL: https://issues.apache.org/jira/browse/FLINK-10900 > Project: Flink > Issue Type: Task > Components: Kafka Connector >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Piotr Nowojski >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > > Given the test problems with the Kafka 2.0 connector we should mark this > connector as a beta feature until we have fully understood why so many tests > deadlock. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-10843. -- Resolution: Fixed Fix Version/s: 1.7.0 Fixed in master: ad7e81ae3642f7d1f695a6f2474c59a81bb83949 Fixed in 1.7.0: d2c2773972fa0b4c5a348d2f885b6c9956432c18 > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10872) Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
[ https://issues.apache.org/jira/browse/FLINK-10872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-10872. -- Resolution: Fixed Fix Version/s: 1.7.0 Fixed in master: acd041cca5d591176aafc7de5d62db45c13b7375 Fixed in 1.7.0: 1891949d2584e67a5edb878eac62268c447fdc1f > Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11 > > > Key: FLINK-10872 > URL: https://issues.apache.org/jira/browse/FLINK-10872 > Project: Flink > Issue Type: Test > Components: E2E Tests >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski closed pull request #7108: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination
pnowojski closed pull request #7108: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/7108 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index adecab16d8c..d332cd0ca6a 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -45,6 +45,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception { } // make sure we propagate pending errors checkErroneous(); + pendingTransactions().forEach(transaction -> + IOUtils.closeQuietly(transaction.getValue().producer) + ); } // --- Logic for handling checkpoint flushing -- // @@ -713,8 +717,11 @@ protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011 @Override protected void commit(KafkaTransactionState transaction) { if (transaction.isTransactional()) { - transaction.producer.commitTransaction(); - recycleTransactionalProducer(transaction.producer); + try { + transaction.producer.commitTransaction(); + } finally { + recycleTransactionalProducer(transaction.producer); + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 8398aa8cdbf..f2725242b2c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -38,20 +38,24 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.time.Clock; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Stream; import static java.util.Objects.requireNonNull; import static org.apache.flink.util.Preconditions.checkArgument; @@ -149,6 +153,12 @@ protected TXN currentTransaction() { return currentTransactionHolder == null ? null : currentTransactionHolder.handle; } + @Nonnull + protected Stream> pendingTransactions() { + return pendingCommitTransactions.entrySet().stream() + .map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().handle)); + } + // -- methods that should be implemented in child class to support two phase commit algorithm -- /** @@ -256,6 +266,7 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { Iterator>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator(); checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending"); + Throwable firstError = null; while (pendingTransactionIterator.hasNext()) { Map.Entry> entry = pendingTransactionIterator.next(); @@ -269,12 +280,23 @@ public final void
[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures
[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689662#comment-16689662 ] ASF GitHub Bot commented on FLINK-10455: pnowojski closed pull request #7108: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/7108 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index adecab16d8c..d332cd0ca6a 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -45,6 +45,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception { } // make sure we propagate pending errors checkErroneous(); + pendingTransactions().forEach(transaction -> + IOUtils.closeQuietly(transaction.getValue().producer) + ); } // --- Logic for handling checkpoint flushing -- // @@ -713,8 +717,11 @@ protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011 @Override protected void commit(KafkaTransactionState transaction) { if (transaction.isTransactional()) { - transaction.producer.commitTransaction(); - recycleTransactionalProducer(transaction.producer); + try { + transaction.producer.commitTransaction(); + } finally { + recycleTransactionalProducer(transaction.producer); + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 8398aa8cdbf..f2725242b2c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -38,20 +38,24 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.time.Clock; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Stream; import static java.util.Objects.requireNonNull; import static org.apache.flink.util.Preconditions.checkArgument; @@ -149,6 +153,12 @@ protected TXN currentTransaction() { return currentTransactionHolder == null ? null : currentTransactionHolder.handle; } + @Nonnull + protected Stream> pendingTransactions() { + return pendingCommitTransactions.entrySet().stream() + .map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().handle)); + } + // -- methods that should be implemented in child class to support two phase commit algorithm -- /** @@ -256,6 +266,7 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { Iterator>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator(); checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending"); +
[jira] [Closed] (FLINK-10455) Potential Kafka producer leak in case of failures
[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-10455. -- Resolution: Fixed Fix Version/s: 1.6.3 1.5.6 merged commit e493d83 into apache:release-1.5 merged commit 489be82 into apache:release-1.6 > Potential Kafka producer leak in case of failures > - > > Key: FLINK-10455 > URL: https://issues.apache.org/jira/browse/FLINK-10455 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.2 >Reporter: Nico Kruber >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > If the Kafka brokers' timeout is too low for our checkpoint interval [1], we > may get an {{ProducerFencedException}}. Documentation around > {{ProducerFencedException}} explicitly states that we should close the > producer after encountering it. > By looking at the code, it doesn't seem like this is actually done in > {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in > {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an > exception, we don't clean up (nor try to commit) any other transaction. > -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} > simply iterates over the {{pendingCommitTransactions}} which is not touched > during {{close()}} > Now if we restart the failing job on the same Flink cluster, any resources > from the previous attempt will still linger around. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10880) Failover strategies should not be applied to Batch Execution
[ https://issues.apache.org/jira/browse/FLINK-10880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689661#comment-16689661 ] ASF GitHub Bot commented on FLINK-10880: tillrohrmann opened a new pull request #7129: [FLINK-10880] Exclude JobManagerOptions#EXECUTION_FAILOVER_STRATEGY from documentation URL: https://github.com/apache/flink/pull/7129 ## What is the purpose of the change This commit excludes the `JobManagerOptions#EXECUTION_FAILOVER_STRATEGY` from Flink's configuration documentation. ## Brief change log - Introduce `Documentation#ExcludeFromDocumentation` - Exclude `JobManagerOptions#EXECUTION_FAILOVER_STRATEGY` from documentation ## Verifying this change - Added `ConfigOptionsDocGeneratorTest#testConfigOptionExclusion` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Failover strategies should not be applied to Batch Execution > > > Key: FLINK-10880 > URL: https://issues.apache.org/jira/browse/FLINK-10880 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.6.2 >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.3, 1.7.0 > > > When configuring a failover strategy other than "full", DataSet/Batch > execution is currently not correct. > This is expected, the failover region strategy is an experimental WIP feature > for streaming that has not been extended to the DataSet API. > We need to document this and prevent execution of DataSet features with other > failover strategies than "full". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures
[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689659#comment-16689659 ] ASF GitHub Bot commented on FLINK-10455: pnowojski closed pull request #7107: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/7107 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 84973726113..fe383ad6801 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -45,6 +45,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception { } // make sure we propagate pending errors checkErroneous(); + pendingTransactions().forEach(transaction -> + IOUtils.closeQuietly(transaction.getValue().producer) + ); } // --- Logic for handling checkpoint flushing -- // @@ -714,8 +718,11 @@ protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011 protected void commit(KafkaTransactionState transaction) { switch (semantic) { case EXACTLY_ONCE: - transaction.producer.commitTransaction(); - recycleTransactionalProducer(transaction.producer); + try { + transaction.producer.commitTransaction(); + } finally { + recycleTransactionalProducer(transaction.producer); + } break; case AT_LEAST_ONCE: case NONE: diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 2ffb6d5810e..62562623fc4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -38,20 +38,24 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.time.Clock; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Stream; import static java.util.Objects.requireNonNull; import static org.apache.flink.util.Preconditions.checkArgument; @@ -149,6 +153,12 @@ protected TXN currentTransaction() { return currentTransactionHolder == null ? null : currentTransactionHolder.handle; } + @Nonnull + protected Stream> pendingTransactions() { + return pendingCommitTransactions.entrySet().stream() + .map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().handle)); + } + // -- methods that should be implemented in child class to support two phase commit algorithm -- /** @@ -256,6 +266,7 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { Iterator>> pendingTransactionIterator =
[jira] [Reopened] (FLINK-10455) Potential Kafka producer leak in case of failures
[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reopened FLINK-10455: > Potential Kafka producer leak in case of failures > - > > Key: FLINK-10455 > URL: https://issues.apache.org/jira/browse/FLINK-10455 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.2 >Reporter: Nico Kruber >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > If the Kafka brokers' timeout is too low for our checkpoint interval [1], we > may get an {{ProducerFencedException}}. Documentation around > {{ProducerFencedException}} explicitly states that we should close the > producer after encountering it. > By looking at the code, it doesn't seem like this is actually done in > {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in > {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an > exception, we don't clean up (nor try to commit) any other transaction. > -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} > simply iterates over the {{pendingCommitTransactions}} which is not touched > during {{close()}} > Now if we restart the failing job on the same Flink cluster, any resources > from the previous attempt will still linger around. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #7107: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination
pnowojski commented on issue #7107: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/7107#issuecomment-439457778 Thanks @azagrebin, merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski closed pull request #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1
pnowojski closed pull request #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1 URL: https://github.com/apache/flink/pull/7101 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml index 67ec39d8f27..63d43994687 100644 --- a/flink-connectors/flink-connector-kafka/pom.xml +++ b/flink-connectors/flink-connector-kafka/pom.xml @@ -36,7 +36,7 @@ under the License. jar - 2.0.0 + 2.0.1 diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index fcfe4bf69f0..1a6a80e9c8f 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -114,7 +114,7 @@ under the License. as we neither access nor package the kafka dependencies --> org.apache.kafka kafka-clients - 2.0.0 + 2.0.1 diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index 5dd68838ba7..8ace250a319 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -20,7 +20,7 @@ set -Eeuo pipefail KAFKA_CONNECTOR_VERSION="2.0" -KAFKA_VERSION="2.0.0" +KAFKA_VERSION="2.0.1" CONFLUENT_VERSION="5.0.0" CONFLUENT_MAJOR_VERSION="5.0" KAFKA_SQL_VERSION="universal" diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh index 94e89a2b1e8..0941cf200f1 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh @@ -19,4 +19,4 @@ set -Eeuo pipefail -source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 "kafka" "universal" +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.1 5.0.0 5.0 "kafka" "universal" diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh index 044d2237988..ff36cf1d778 100755 --- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh +++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh @@ -20,6 +20,6 @@ set -Eeuo pipefail source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/kafka-common.sh 2.0.0 5.0.0 5.0 +source "$(dirname "$0")"/kafka-common.sh 2.0.1 5.0.0 5.0 source "$(dirname "$0")"/test_streaming_kafka_common.sh $FLINK_DIR/examples/streaming/KafkaExample.jar This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10891) Upgrade Kafka client version to 2.0.1
[ https://issues.apache.org/jira/browse/FLINK-10891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689681#comment-16689681 ] ASF GitHub Bot commented on FLINK-10891: pnowojski closed pull request #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1 URL: https://github.com/apache/flink/pull/7101 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml index 67ec39d8f27..63d43994687 100644 --- a/flink-connectors/flink-connector-kafka/pom.xml +++ b/flink-connectors/flink-connector-kafka/pom.xml @@ -36,7 +36,7 @@ under the License. jar - 2.0.0 + 2.0.1 diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index fcfe4bf69f0..1a6a80e9c8f 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -114,7 +114,7 @@ under the License. as we neither access nor package the kafka dependencies --> org.apache.kafka kafka-clients - 2.0.0 + 2.0.1 diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index 5dd68838ba7..8ace250a319 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -20,7 +20,7 @@ set -Eeuo pipefail KAFKA_CONNECTOR_VERSION="2.0" -KAFKA_VERSION="2.0.0" +KAFKA_VERSION="2.0.1" CONFLUENT_VERSION="5.0.0" CONFLUENT_MAJOR_VERSION="5.0" KAFKA_SQL_VERSION="universal" diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh index 94e89a2b1e8..0941cf200f1 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh @@ -19,4 +19,4 @@ set -Eeuo pipefail -source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 "kafka" "universal" +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.1 5.0.0 5.0 "kafka" "universal" diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh index 044d2237988..ff36cf1d778 100755 --- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh +++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh @@ -20,6 +20,6 @@ set -Eeuo pipefail source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/kafka-common.sh 2.0.0 5.0.0 5.0 +source "$(dirname "$0")"/kafka-common.sh 2.0.1 5.0.0 5.0 source "$(dirname "$0")"/test_streaming_kafka_common.sh $FLINK_DIR/examples/streaming/KafkaExample.jar This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Upgrade Kafka client version to 2.0.1 > - > > Key: FLINK-10891 > URL: https://issues.apache.org/jira/browse/FLINK-10891 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Since the modern kafka connector only keeps track of the latest version of > the kafka client. With the release of Kafka 2.0.1, we should upgrade the > version of the kafka client maven dependency. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1
pnowojski commented on issue #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1 URL: https://github.com/apache/flink/pull/7101#issuecomment-439462934 I'm merging it, thanks again for the contribution @yanghua. Regarding the issues, I'm always looking on apache JIRA for either a failing test name or an exception message, since one of those is always mentioned somewhere. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10904) Expose Classloader before Pipeline execution
[ https://issues.apache.org/jira/browse/FLINK-10904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689722#comment-16689722 ] Stephan Ewen commented on FLINK-10904: -- Anywhere in the Flink processes (TaskManager) where any application code is involved, the classloader should be available as the thread context classloader. It should be available there before any execution starts, even when deserializing the code from the RPC payload, it should be already available. > Expose Classloader before Pipeline execution > > > Key: FLINK-10904 > URL: https://issues.apache.org/jira/browse/FLINK-10904 > Project: Flink > Issue Type: Wish >Reporter: Luka Jurukovski >Priority: Minor > > Not sure if this is something that I just have to deal with. However it would > be nice if the classloader can be accessed before the pipeline starts > executing. The case for this is that I am loading classes that contain Flink > operators. I am running into classdef not found issues because the > classloader used by Flink is different then the application program that is > being run. Currently what I have been doing as a work around is adding the > libs that cause these issues in the /lib directory of the Flink cluster and > marking it as provided in the application jar that is uploaded to the Flink > cluster. The issues with this are two fold, first it makes deployment more > complex, as well as there are cases where Classloading causes exceptions to > arise in unusual circumstances. Ie RabbitMQ connector caused issues only when > it was auto-recovering the connection, but not during normal ingest. I can > elaborate further if needed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #3880: [FLINK-6457] [table] Clean up ScalarFunction and TableFunction interface
twalthr commented on issue #3880: [FLINK-6457] [table] Clean up ScalarFunction and TableFunction interface URL: https://github.com/apache/flink/pull/3880#issuecomment-439417094 I also agree with @fhueske. I will close this PR for now. The affected classes might be ported to Java soon anyway due to FLINK-10689. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr closed pull request #3880: [FLINK-6457] [table] Clean up ScalarFunction and TableFunction interface
twalthr closed pull request #3880: [FLINK-6457] [table] Clean up ScalarFunction and TableFunction interface URL: https://github.com/apache/flink/pull/3880 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 9f50f0c806a..1aab7ae0993 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -68,6 +68,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable.HashMap import _root_.scala.annotation.varargs +import _root_.scala.util.{Try, Success, Failure} /** * The abstract base class for batch and stream TableEnvironments. @@ -340,10 +341,11 @@ abstract class TableEnvironment(val config: TableConfig) { // check if class could be instantiated checkForInstantiation(function.getClass) -val typeInfo: TypeInformation[_] = if (function.getResultType != null) { - function.getResultType -} else { - implicitly[TypeInformation[T]] +val typeInfo: TypeInformation[_] = Try { + function.getClass.getDeclaredMethod("getResultType") +} match { + case Success(m) => m.invoke(function).asInstanceOf[TypeInformation[_]] + case Failure(_) => implicitly[TypeInformation[T]] } // register in Table API diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala new file mode 100644 index 000..d781e76071d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.table.expressions.{Expression, ScalarFunctionCall} +import org.apache.flink.table.functions.ScalarFunction + +/** + * Holds methods to convert a [[ScalarFunction]] call + * in the Scala Table API into a [[ScalarFunctionCall]]. + * + * @param sf The ScalarFunction to convert. + */ +class ScalarFunctionConversions(sf: ScalarFunction) { + /** +* Creates a [[ScalarFunctionCall]] in Scala Table API. +* +* @param params actual parameters of function +* @return [[Expression]] in form of a [[ScalarFunctionCall]] +*/ + final def apply(params: Expression*): Expression = { +ScalarFunctionCall(sf, params) + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala index 692876fe2da..292e241077a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala @@ -24,6 +24,8 @@ import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.plan.logical.LogicalTableFunctionCall +import scala.util.{Failure, Success, Try} + /** * Holds methods to convert a [[TableFunction]] call in the Scala Table API into a [[Table]]. * @@ -39,7 +41,12 @@ class TableFunctionConversions[T](tf: TableFunction[T]) { */ final def apply(args: Expression*)(implicit typeInfo: TypeInformation[T]): Table = { -val resultType = if (tf.getResultType == null) typeInfo else tf.getResultType +val resultType: TypeInformation[_] = Try { +
[jira] [Commented] (FLINK-10889) Semantic inconsistency between DataSet#print and DataStream#print
[ https://issues.apache.org/jira/browse/FLINK-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689512#comment-16689512 ] Fabian Hueske commented on FLINK-10889: --- Hmm, OK. I see. This (or a similar) behavior would in fact be needed for Zeppelin. Unfortunately, DataStream.print() is public API that we should avoid to break. I know it is only declared as {{@PublicEvolving}} but still. Maybe we can introduce another method for this. We have been thinking about a REST service for SQL queries that also supports streaming results back. Although more in the distance, this might be a better approach in the long term. > Semantic inconsistency between DataSet#print and DataStream#print > - > > Key: FLINK-10889 > URL: https://issues.apache.org/jira/browse/FLINK-10889 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API >Reporter: Jeff Zhang >Assignee: vinoyang >Priority: Major > > DataSet#print will print the result on client side, while DataStream#print > will print the result on TM. This inconsistency will confuse users. IMHO, we > should make the behavior consistency between DataSet and DataStream, I prefer > to print the result on client side. Regarding DataStream#print, we can use > DataStreamUtils#collect to print it on client side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10869) Update S3 testing settings
[ https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689525#comment-16689525 ] ASF GitHub Bot commented on FLINK-10869: StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#discussion_r234235828 ## File path: flink-end-to-end-tests/test-scripts/common_s3.sh ## @@ -68,8 +68,8 @@ function s3_setup { trap s3_cleanup EXIT cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/ - echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" Review comment: I don't know all the implication of changing/removing these variables in bash scripts ;-) It is also orthogonal to the change here, I would like to leave this for a dedicated cleanup setp This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update S3 testing settings > -- > > Key: FLINK-10869 > URL: https://issues.apache.org/jira/browse/FLINK-10869 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > > Currently S3 tests go against a bucket hosted by 'data Artisans'. > As part of reworking the AWS permission setup, we need to adapt the > credentials and buckets for these tests. > Future tests should refer to the following environment variables for S3 tests: > * `IT_CASE_S3_BUCKET` > * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA` > * `IT_CASE_S3_SECRET_KEY` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.
StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#discussion_r234235828 ## File path: flink-end-to-end-tests/test-scripts/common_s3.sh ## @@ -68,8 +68,8 @@ function s3_setup { trap s3_cleanup EXIT cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/ - echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" Review comment: I don't know all the implication of changing/removing these variables in bash scripts ;-) It is also orthogonal to the change here, I would like to leave this for a dedicated cleanup setp This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10903) Shade Internal Akka Dependencies
[ https://issues.apache.org/jira/browse/FLINK-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689718#comment-16689718 ] Stephan Ewen commented on FLINK-10903: -- Are you referring specifically to the akka dependency, or to the Scala dependency as a whole? > Shade Internal Akka Dependencies > > > Key: FLINK-10903 > URL: https://issues.apache.org/jira/browse/FLINK-10903 > Project: Flink > Issue Type: Wish >Reporter: Luka Jurukovski >Priority: Minor > > Akka is not a publicly exposed API but is something that forces developers > (particularly in Scala) to use an older version. It would be nice if this was > shaded so that developers are free to use the version of their choosing > without needing to worry about binary backwards compatibility, or in the case > that a user is forced to use parent first classloading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10880) Failover strategies should not be applied to Batch Execution
[ https://issues.apache.org/jira/browse/FLINK-10880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689673#comment-16689673 ] ASF GitHub Bot commented on FLINK-10880: tillrohrmann commented on a change in pull request #7129: [FLINK-10880] Exclude JobManagerOptions#EXECUTION_FAILOVER_STRATEGY from documentation URL: https://github.com/apache/flink/pull/7129#discussion_r234281426 ## File path: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ## @@ -106,6 +106,7 @@ /** * This option specifies the failover strategy, i.e. how the job computation recovers from task failures. */ + @Documentation.ExcludeFromDocumentation Review comment: Good idea. Will change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Failover strategies should not be applied to Batch Execution > > > Key: FLINK-10880 > URL: https://issues.apache.org/jira/browse/FLINK-10880 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.6.2 >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.3, 1.7.0 > > > When configuring a failover strategy other than "full", DataSet/Batch > execution is currently not correct. > This is expected, the failover region strategy is an experimental WIP feature > for streaming that has not been extended to the DataSet API. > We need to document this and prevent execution of DataSet features with other > failover strategies than "full". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6036) Let catalog support partition
[ https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689489#comment-16689489 ] ASF GitHub Bot commented on FLINK-6036: --- twalthr closed pull request #3569: [FLINK-6036] [table] Let catalog support partition URL: https://github.com/apache/flink/pull/3569 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala index 760cf7588f4..1b07fd8998d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala @@ -18,7 +18,9 @@ package org.apache.flink.table.api +import com.google.common.base.Joiner import org.apache.flink.table.catalog.TableSourceConverter +import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec /** * Exception for all errors occurring during expression parsing. @@ -74,6 +76,50 @@ object ValidationException { */ case class UnresolvedException(msg: String) extends RuntimeException(msg) +/** + * Exception for an operation on a nonexistent partition + * + * @param dbdatabase name + * @param table table name + * @param partitionSpec partition spec + * @param cause the cause + */ +case class PartitionNotExistException( +db: String, +table: String, +partitionSpec: PartitionSpec, +cause: Throwable) +extends RuntimeException( + s"Partition [${Joiner.on(",").withKeyValueSeparator("=").join(partitionSpec)}] " + + s"does not exist in table $db.$table!", cause) { + + def this(db: String, table: String, partitionSpec: PartitionSpec) = +this(db, table, partitionSpec, null) + +} + +/** + * Exception for adding an already existent partition + * + * @param dbdatabase name + * @param table table name + * @param partitionSpec partition spec + * @param cause the cause + */ +case class PartitionAlreadyExistException( +db: String, +table: String, +partitionSpec: PartitionSpec, +cause: Throwable) +extends RuntimeException( + s"Partition [${Joiner.on(",").withKeyValueSeparator("=").join(partitionSpec)}] " + + s"already exists in table $db.$table!", cause) { + + def this(db: String, table: String, partitionSpec: PartitionSpec) = +this(db, table, partitionSpec, null) + +} + /** * Exception for an operation on a nonexistent table * diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala index fcefa45fcc5..78798ce0b72 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala @@ -19,12 +19,81 @@ package org.apache.flink.table.catalog import org.apache.flink.table.api._ +import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec /** * The CrudExternalCatalog provides methods to create, drop, and alter databases or tables. */ trait CrudExternalCatalog extends ExternalCatalog { + /** +* Adds a partition to the catalog. +* +* @param dbName The name of the table's database. +* @param tableName The name of the table. +* @param part Description of the partition to add. +* @param ignoreIfExists Flag to specify behavior if a partition with the given spec +* already exists: +* if set to false, it throws a PartitionAlreadyExistException, +* if set to true, nothing happens. +* @throws DatabaseNotExistException thrown if the database does not exist in the catalog. +* @throws TableNotExistException thrown if the table does not exist in the catalog. +* @throws PartitionAlreadyExistException thrown if the partition already exists and +*ignoreIfExists is false +*/ + @throws[DatabaseNotExistException] + @throws[TableNotExistException] + @throws[PartitionAlreadyExistException] + def createPartition( + dbName: String, + tableName: String, + part: ExternalCatalogTablePartition, + ignoreIfExists: Boolean): Unit + + /** +* Deletes partition from a database of the catalog. +* +* @param dbNameThe name of the
[GitHub] StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.
StephanEwen commented on a change in pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#discussion_r234236294 ## File path: flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java ## @@ -87,7 +79,8 @@ public static void cleanUp() throws IOException { @Test(expected = UnsupportedOperationException.class) public void requestingRecoverableWriterShouldThroughException() throws Exception { - FlinkS3FileSystem fileSystem = (FlinkS3FileSystem) FileSystem.get(basePath.toUri()); + URI s3Uri = URI.create(S3TestCredentials.getTestBucketUri()); Review comment: It never used the `TEST_DATA_DIR` - the `getFileSystem()` only uses scheme and authority, path does not matter. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta URL: https://github.com/apache/flink/pull/7109#discussion_r234248328 ## File path: docs/dev/connectors/kafka.md ## @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not part of the binary distribu ## Kafka 1.0.0+ Connector -Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. Rather, it tracks the latest version of Kafka at the time of the Flink release. +Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. +Rather, it tracks the latest version of Kafka at the time of the Flink release. -If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. +If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. +If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. ### Compatibility -The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. The modern Kafka connector is compatible with broker versions 0.11.0 or later, depending on the features used. For details on Kafka compatibility, please refer to the [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility). +The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. Review comment: I was just brakinglines here, but I can fix this while I'm at it. Thanks for pointing out. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10900) Mark Kafka 2.0 connector as beta feature
[ https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689566#comment-16689566 ] ASF GitHub Bot commented on FLINK-10900: pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta URL: https://github.com/apache/flink/pull/7109#discussion_r234247629 ## File path: docs/dev/connectors/kafka.md ## @@ -73,21 +73,33 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is This connector supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message;>Kafka messages with timestamps both for producing and consuming. -flink-connector-kafka-0.11_2.11 +flink-connector-kafka-0.11{{ site.scala_version_suffix }} 1.4.0 FlinkKafkaConsumer011 FlinkKafkaProducer011 0.11.x Since 0.11.x Kafka does not support scala 2.10. This connector supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging;>Kafka transactional messaging to provide exactly once semantic for the producer. -flink-connector-kafka_2.11 +flink-connector-kafka{{ site.scala_version_suffix }} 1.7.0 FlinkKafkaConsumer FlinkKafkaProducer >= 1.0.0 -This Kafka connector attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated flink-connector-kafka-0.11 and link-connector-kafka-0.10 respectively. - + +This universal Kafka connector attempts to track the latest version of the Kafka client. +The version of the client it uses may change between Flink releases. +Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. +However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated +flink-connector-kafka-0.11 and link-connector-kafka-0.10 respectively. + + Attention: as of Flink 1.7 the universal Kafka connector is considered to be + in a BETA status and might not be as stable as the 0.11 connector. + In case of problems with the universal connector, you can try to use flink-connector-kafka-0.11{{ site.scala_version_suffix }} + which should be compatible with all of the Kafka versions starting from 0.11. Review comment: This is a good point, but as far as I know this is related only if someone would want to upgrade `kafka.version` of the `flink-connector-kafka`. There were couple of users that were successfully using `flink-connector-kafka-0.11` against Kafka 1.0 and it should work for the same reasons why `universal` connector should work with any Kafka version `0.11+`. I would like to merge this before the next release candidate, but if I'm wrong with this regard, please let me know here - we can always fix the documentation after the release :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mark Kafka 2.0 connector as beta feature > > > Key: FLINK-10900 > URL: https://issues.apache.org/jira/browse/FLINK-10900 > Project: Flink > Issue Type: Task > Components: Kafka Connector >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Piotr Nowojski >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > > Given the test problems with the Kafka 2.0 connector we should mark this > connector as a beta feature until we have fully understood why so many tests > deadlock. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta URL: https://github.com/apache/flink/pull/7109#discussion_r234248328 ## File path: docs/dev/connectors/kafka.md ## @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not part of the binary distribu ## Kafka 1.0.0+ Connector -Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. Rather, it tracks the latest version of Kafka at the time of the Flink release. +Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. +Rather, it tracks the latest version of Kafka at the time of the Flink release. -If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. +If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. +If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. ### Compatibility -The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. The modern Kafka connector is compatible with broker versions 0.11.0 or later, depending on the features used. For details on Kafka compatibility, please refer to the [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility). +The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. Review comment: I was just braking new lines here, but I can fix this while I'm at it. Thanks for pointing out. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta URL: https://github.com/apache/flink/pull/7109#discussion_r234248328 ## File path: docs/dev/connectors/kafka.md ## @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not part of the binary distribu ## Kafka 1.0.0+ Connector -Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. Rather, it tracks the latest version of Kafka at the time of the Flink release. +Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. +Rather, it tracks the latest version of Kafka at the time of the Flink release. -If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. +If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. +If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. ### Compatibility -The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. The modern Kafka connector is compatible with broker versions 0.11.0 or later, depending on the features used. For details on Kafka compatibility, please refer to the [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility). +The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. Review comment: I was just braking lines here, but I can fix this while I'm at it. Thanks for pointing out - I have fixed all `modern` references. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski closed pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta
pnowojski closed pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta URL: https://github.com/apache/flink/pull/7109 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index f81080a4b53..0630c6ec7d6 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -73,7 +73,7 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is This connector supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message;>Kafka messages with timestamps both for producing and consuming. -flink-connector-kafka-0.11_2.11 +flink-connector-kafka-0.11{{ site.scala_version_suffix }} 1.4.0 FlinkKafkaConsumer011 FlinkKafkaProducer011 @@ -81,13 +81,25 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is Since 0.11.x Kafka does not support scala 2.10. This connector supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging;>Kafka transactional messaging to provide exactly once semantic for the producer. -flink-connector-kafka_2.11 +flink-connector-kafka{{ site.scala_version_suffix }} 1.7.0 FlinkKafkaConsumer FlinkKafkaProducer >= 1.0.0 -This Kafka connector attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated flink-connector-kafka-0.11 and link-connector-kafka-0.10 respectively. - + +This universal Kafka connector attempts to track the latest version of the Kafka client. +The version of the client it uses may change between Flink releases. +Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. +However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated +flink-connector-kafka-0.11{{ site.scala_version_suffix }} and link-connector-kafka-0.10{{ site.scala_version_suffix }} respectively. + + Attention: as of Flink 1.7 the universal Kafka connector is considered to be + in a BETA status and might not be as stable as the 0.11 connector. + In case of problems with the universal connector, you can try to use flink-connector-kafka-0.11{{ site.scala_version_suffix }} + which should be compatible with all of the Kafka versions starting from 0.11. + + + @@ -101,7 +113,8 @@ Then, import the connector in your maven project: {% endhighlight %} -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html). +Note that the streaming connectors are currently not part of the binary distribution. +See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html). ## Installing Apache Kafka @@ -110,17 +123,21 @@ Note that the streaming connectors are currently not part of the binary distribu ## Kafka 1.0.0+ Connector -Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. Rather, it tracks the latest version of Kafka at the time of the Flink release. +Starting with Flink 1.7, there is a new universal Kafka connector that does not track a specific Kafka major version. +Rather, it tracks the latest version of Kafka at the time of the Flink release. -If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. +If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. +If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. ### Compatibility -The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. The modern Kafka connector is compatible with broker versions 0.11.0 or later, depending on the features used. For details on Kafka compatibility, please refer to the [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility). +The universal Kafka connector is compatible with older and newer Kafka brokers through
[jira] [Commented] (FLINK-10900) Mark Kafka 2.0 connector as beta feature
[ https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689579#comment-16689579 ] ASF GitHub Bot commented on FLINK-10900: pnowojski commented on a change in pull request #7109: [FLINK-10900][kafka][docs] Mark universal Kafka connector as beta URL: https://github.com/apache/flink/pull/7109#discussion_r234248328 ## File path: docs/dev/connectors/kafka.md ## @@ -110,13 +123,17 @@ Note that the streaming connectors are currently not part of the binary distribu ## Kafka 1.0.0+ Connector -Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. Rather, it tracks the latest version of Kafka at the time of the Flink release. +Starting with Flink 1.7, there is a new Kafka connector that does not track a specific Kafka major version. +Rather, it tracks the latest version of Kafka at the time of the Flink release. -If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. +If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. +If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. ### Compatibility -The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. The modern Kafka connector is compatible with broker versions 0.11.0 or later, depending on the features used. For details on Kafka compatibility, please refer to the [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility). +The modern Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. Review comment: I was just braking lines here, but I can fix this while I'm at it. Thanks for pointing out - I have fixed all `modern` references. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mark Kafka 2.0 connector as beta feature > > > Key: FLINK-10900 > URL: https://issues.apache.org/jira/browse/FLINK-10900 > Project: Flink > Issue Type: Task > Components: Kafka Connector >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Piotr Nowojski >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > > Given the test problems with the Kafka 2.0 connector we should mark this > connector as a beta feature until we have fully understood why so many tests > deadlock. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10906) docker-entrypoint.sh logs credentails during startup
[ https://issues.apache.org/jira/browse/FLINK-10906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10906: --- Labels: pull-request-available (was: ) > docker-entrypoint.sh logs credentails during startup > > > Key: FLINK-10906 > URL: https://issues.apache.org/jira/browse/FLINK-10906 > Project: Flink > Issue Type: Improvement > Components: Docker >Reporter: Konstantin Knauf >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > > {{docker-entrypoint.sh}} prints the full flink-configuration file including > potentially sensitive configuration entries. > To be consistent we should hide these as in {{GlobalConfiguration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann opened a new pull request #7127: [BP-1.6][FLINK-10906][docker] Don't print configuration in docker-entrypoint.sh
tillrohrmann opened a new pull request #7127: [BP-1.6][FLINK-10906][docker] Don't print configuration in docker-entrypoint.sh URL: https://github.com/apache/flink/pull/7127 Backport of #7125 to `release-1.6`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services