[GitHub] flink pull request #6419: [FLINK-9949][tests] Kill Flink processes in DB/tea...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6419 [FLINK-9949][tests] Kill Flink processes in DB/teardown ## What is the purpose of the change *Not killing Flink processes at the end of a test, can cause interference with subsequent test runs.* ## Brief change log - *Kill Flink processes in `DB/teardown!`.* ## Verifying this change This change added tests and can be verified as follows: - *Ran tests in docker.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) cc: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9949 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6419.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6419 commit db3cecb5c9fb16d707e02b436244ba8fd5ee1ce8 Author: gyao Date: 2018-07-25T13:28:40Z [FLINK-9949][tests] Kill Flink processes in DB/teardown ---
[GitHub] flink pull request #6418: [FLINK-9939][runtime] Mesos: Not setting TMP dirs ...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6418 [FLINK-9939][runtime] Mesos: Not setting TMP dirs causes NPE ## What is the purpose of the change *This fixes a possible NPE when deploying on Mesos.* ## Brief change log - *Add null check to `BootstrapTools.updateTmpDirectoriesInConfiguration(...)`, and add `@Nullable` annotation.* ## Verifying this change This change added tests and can be verified as follows: - *Added unit test to assert that `updateTmpDirectoriesInConfiguration` can handle `null` values.* - *Manually deployed on Mesos.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9939 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6418.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6418 commit eb03ada77db7633d330be7847dbdf9ee801a9bee Author: gyao Date: 2018-07-25T10:23:20Z [hotfix][tests] Fix checkstyle violations in BootstrapToolsTest. commit 261d4d7423b9ca179ac0004625f51b7b71655d63 Author: gyao Date: 2018-07-25T11:57:34Z [FLINK-9939][runtime] Add null check before setting tmp dirs in config. ---
[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6406 [FLINK-9159][runtime] Sanity check default timeout values ## What is the purpose of the change *Set the default timeouts for resource release to sane values. Consolidate config keys and documentation.* ## Brief change log - *Set default value of `mesos.failover-timeout` to 1 week.* - *Deprecate config key `slotmanager.request-timeout`* ## Verifying this change This change added tests and can be verified as follows: - *Added test `SlotManagerConfigurationTest` to verify that slot request timeouts are set correctly.* - *Manually deployed on Mesos 1.5.0.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) cc: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9159 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6406.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6406 commit 96ef18e322c5623a609382057985b35971ba3234 Author: gyao Date: 2018-07-24T13:38:47Z [FLINK-9159][mesos] Set default value of mesos.failover-timeout to 1 week. commit 112122912d7dd78c612c1648f3e2b041ae65afa6 Author: gyao Date: 2018-07-24T13:48:27Z [FLINK-9159][runtime] Deprecate config key slotmanager.request-timeout - Replace config key slotmanager.request-timeout with slot.request.timeout because both keys have effectively the same semantics. - Rename key slotmanager.taskmanager-timeout to resourcemanager.taskmanager-timeout. commit 787f7c1480a5676e7ce52092265b3cd051064e3c Author: gyao Date: 2018-07-24T13:55:16Z [hotfix][docs] Add -DskipTests flag to command that generates docs. ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204400729 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java --- @@ -504,6 +512,107 @@ public void testSlotReleasingFailsSchedulingOperation() throws Exception { assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.FAILED)); } + /** +* Tests that all slots are being returned to the {@link SlotOwner} if the +* {@link ExecutionGraph} is being cancelled. See FLINK-9908 +*/ + @Test + public void testCancellationOfIncompleteScheduling() throws Exception { + final int parallelism = 10; + + final JobVertex jobVertex = new JobVertex("Test job vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(parallelism); + + final JobGraph jobGraph = new JobGraph(jobVertex); + jobGraph.setAllowQueuedScheduling(true); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + final ConcurrentMap slotRequestIds = new ConcurrentHashMap<>(parallelism); + final CountDownLatch requestedSlotsLatch = new CountDownLatch(parallelism); + + final TestingSlotProvider slotProvider = new TestingSlotProvider( + (SlotRequestId slotRequestId) -> { + slotRequestIds.put(slotRequestId, 1); + requestedSlotsLatch.countDown(); + return new CompletableFuture<>(); + }); + + + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider); + + executionGraph.scheduleForExecution(); + + // wait until we have requested all slots + requestedSlotsLatch.await(); + + final ExpectedSlotRequestIds expectedSlotRequestIds = new ExpectedSlotRequestIds(slotRequestIds.keySet()); + slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId())); + slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId); + + final OneShotLatch slotRequestsBeingFulfilled = new OneShotLatch(); + + // start completing the slot requests asynchronously + executor.execute( + () -> { + slotRequestsBeingFulfilled.trigger(); + + for (SlotRequestId slotRequestId : slotRequestIds.keySet()) { + final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot(slotOwner, taskManagerGateway, slotRequestId); + slotProvider.complete(slotRequestId, singleLogicalSlot); + } + }); + + // make sure that we complete cancellations of deployed tasks + taskManagerGateway.setCancelConsumer( + (ExecutionAttemptID executionAttemptId) -> { + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttemptId); + + // if the execution was cancelled in state SCHEDULING, then it might already have been removed + if (execution != null) { + execution.cancelingComplete(); + } + } + ); + + slotRequestsBeingFulfilled.await(); + + executionGraph.cancel(); + + expectedSlotRequestIds.waitForAllSlotRequestIds(); + } + + private static final class ExpectedSlotRequestIds { --- End diff -- I think this is fine but I would have probably used a synchronized set and a `CountDownLatch`. It would allow for timeouts and also asserting on which ids are missing. ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204399866 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java --- @@ -504,6 +512,107 @@ public void testSlotReleasingFailsSchedulingOperation() throws Exception { assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.FAILED)); } + /** +* Tests that all slots are being returned to the {@link SlotOwner} if the +* {@link ExecutionGraph} is being cancelled. See FLINK-9908 +*/ + @Test + public void testCancellationOfIncompleteScheduling() throws Exception { + final int parallelism = 10; + + final JobVertex jobVertex = new JobVertex("Test job vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(parallelism); + + final JobGraph jobGraph = new JobGraph(jobVertex); + jobGraph.setAllowQueuedScheduling(true); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + final ConcurrentMap slotRequestIds = new ConcurrentHashMap<>(parallelism); + final CountDownLatch requestedSlotsLatch = new CountDownLatch(parallelism); + + final TestingSlotProvider slotProvider = new TestingSlotProvider( + (SlotRequestId slotRequestId) -> { + slotRequestIds.put(slotRequestId, 1); + requestedSlotsLatch.countDown(); + return new CompletableFuture<>(); + }); + + + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider); + + executionGraph.scheduleForExecution(); + + // wait until we have requested all slots + requestedSlotsLatch.await(); + + final ExpectedSlotRequestIds expectedSlotRequestIds = new ExpectedSlotRequestIds(slotRequestIds.keySet()); + slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId())); + slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId); + + final OneShotLatch slotRequestsBeingFulfilled = new OneShotLatch(); --- End diff -- What are the benefits of `OneShotLatch` compared to `new CountDownLatch(1)`. On first glance it seems like an unnecessary re-implementation. ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204394265 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -384,6 +393,8 @@ private MultiTaskSlot( MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId) { Preconditions.checkState(!super.contains(groupId)); + LOG.debug("Create nested multi task slot [{}] in parent multi task slot [{}] for group {}.", slotRequestId, getSlotRequestId(), groupId); --- End diff -- add `[]` ---
[GitHub] flink issue #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failAllocati...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6386 https://github.com/apache/flink/pull/6369 should also be merged after this ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204384134 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -983,7 +983,7 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor @Override public void notifyAllocationFailure(AllocationID allocationID, Exception cause) { - slotPool.failAllocation(allocationID, cause); + slotPoolGateway.failAllocation(allocationID, cause); --- End diff -- The changes don't match the commit message. ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204373664 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -323,7 +323,7 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Allocating slot with request {} for task execution {}", slotRequestId, task.getTaskToExecute()); + log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); --- End diff -- When should the value be enclosed in `[]`? ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204368060 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -1011,7 +1011,7 @@ public void failAllocation(final AllocationID allocationID, final Exception caus failPendingRequest(pendingRequest, cause); } else if (availableSlots.tryRemove(allocationID)) { - log.debug("Failed available slot [{}] with ", allocationID, cause); + log.debug("Failed available slot with allocation id {}.", allocationID, cause); --- End diff -- Why remove *allocation id*? ---
[GitHub] flink pull request #6369: [FLINK-9892][tests] Disable local recovery in Jeps...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6369 [FLINK-9892][tests] Disable local recovery in Jepsen tests. ## What is the purpose of the change *Until FLINK-9635 is fixed, local recovery should be disabled in the Jepsen tests.* ## Brief change log - *Disable local recovery in Jepsen tests.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9892 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6369.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6369 commit 6ac6963d4770d2b10283d14e55ff07a210cfa2c2 Author: gyao Date: 2018-07-19T07:35:44Z [FLINK-9892][tests] Disable local recovery in Jepsen tests. ---
[GitHub] flink pull request #6368: [FLINK-9890] Remove obsolete class ResourceManager...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6368 [FLINK-9890] Remove obsolete class ResourceManagerConfiguration ## What is the purpose of the change *The configuration values are effectively not used. This commit removes the class and all its usages.* ## Brief change log - *Remove ResourceManagerConfiguration and all its usages.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9890 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6368.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6368 commit 50e05c436511c84d58561a6909520e28666f7358 Author: gyao Date: 2018-07-18T14:23:59Z [FLINK-9890][Distributed Coordination] Remove obsolete class ResourceManagerConfiguration The configuration values are effectively not used. This commit removes the class and all its usages. ---
[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6297 @dawidwys Please merge. ---
[GitHub] flink issue #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6320 I tested the tooling around docker-compose and it works for me when using the local distribution. ---
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6320#discussion_r202351378 --- Diff: flink-container/docker/README.md --- @@ -0,0 +1,44 @@ +# Apache Flink cluster deployment on docker using docker-compose + +## Installation + +Install the most recent stable version of docker +https://docs.docker.com/installation/ + +## Build + +Images are based on the official Java Alpine (OpenJDK 8) image. If you want to +build the flink image run: + +sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job + +or + +docker build -t flink . --- End diff -- There are some args missing to make these commands work. Is it intentional? ---
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6320#discussion_r202348473 --- Diff: flink-container/docker/build.sh --- @@ -0,0 +1,128 @@ +#!/bin/sh + + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +usage() { + cat < --from-local-dist [--image-name ] + build.sh --job-jar --from-archive [--image-name ] + build.sh --job-jar --from-release --flink-version --hadoop-version --scala-version [--image-name ] + build.sh --help + + If the --image-name flag is not used the built image name will be 'flink'. +HERE + exit 1 +} + +while [[ $# -ge 1 ]] +do +key="$1" + case $key in +--job-jar) +JOB_JAR_PATH="$2" +shift +;; +--from-local-dist) +FROM_LOCAL="true" +;; +--from-archive) +FROM_ARCHIVE="$2" +shift +;; +--from-release) +FROM_RELEASE="true" +;; +--image-name) +IMAGE_NAME="$2" +shift +;; +--flink-version) +FLINK_VERSION="$2" +shift +;; +--hadoop-version) +HADOOP_VERSION="$(echo "$2" | sed 's/\.//')" +shift +;; +--scala-version) +SCALA_VERSION="$2" +shift +;; +--kubernetes-certificates) +CERTIFICATES_DIR="$2" +shift +;; +--help) +usage +;; +*) +# unknown option +;; + esac + shift +done + +IMAGE_NAME=${IMAGE_NAME:-flink-job} + +# TMPDIR must be contained within the working directory so it is part of the +# Docker context. (i.e. it can't be mktemp'd in /tmp) +TMPDIR=_TMP_ + +cleanup() { +rm -rf "${TMPDIR}" +} +trap cleanup EXIT + +mkdir -p "${TMPDIR}" + +JOB_JAR_TARGET="${TMPDIR}/job.jar" +cp ${JOB_JAR_PATH} ${JOB_JAR_TARGET} + +if [ -n "${FROM_RELEASE}" ]; then + + [[ -n "${FLINK_VERSION}" ]] && [[ -n "${HADOOP_VERSION}" ]] && [[ -n "${SCALA_VERSION}" ]] || usage + + FLINK_BASE_URL="$(curl -s https://www.apache.org/dyn/closer.cgi\?preferred\=true)flink/flink-${FLINK_VERSION}/" + FLINK_DIST_FILE_NAME="flink-${FLINK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala_${SCALA_VERSION}.tgz" + CURL_OUTPUT="${TMPDIR}/${FLINK_DIST_FILE_NAME}" + + echo "Downloading ${FLINK_DIST_FILE_NAME} from ${FLINK_BASE_URL}" + curl -s ${FLINK_BASE_URL}${FLINK_DIST_FILE_NAME} --output ${CURL_OUTPUT} --- End diff -- I think it's a bit surprising that for 1.5.0 the command will fail: `build.sh --job-jar ./job.jar --from-release --flink-version 1.5.0 --hadoop-version 28 --scala-version 2.11` `build.sh --job-jar ./job.jar --from-release --flink-version 1.5.1 --hadoop-version 28 --scala-version 2.11` ---
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6320#discussion_r202336290 --- Diff: flink-container/docker/README.md --- @@ -0,0 +1,44 @@ +# Apache Flink cluster deployment on docker using docker-compose + +## Installation + +Install the most recent stable version of docker +https://docs.docker.com/installation/ + +## Build + +Images are based on the official Java Alpine (OpenJDK 8) image. If you want to +build the flink image run: + +sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job --- End diff -- nit: The shebang in `build.sh` already tells the program loader to use `sh`. ---
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6320#discussion_r202332741 --- Diff: flink-container/pom.xml --- @@ -0,0 +1,67 @@ +
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6320#discussion_r202332323 --- Diff: flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link StandaloneJobClusterConfigurationParserFactory}. + */ +public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogger { + + @Test + public void testEntrypointClusterConfigurationParsing() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory()); + + final String configDir = "/foo/bar"; + final String key = "key"; + final String value = "value"; + final int restPort = 1234; + final String jobClassName = "foobar"; + final String arg1 = "arg1"; + final String arg2 = "arg2"; + final String[] args = {"--configDir", configDir, "--webui-port", String.valueOf(restPort), "--job-classname", jobClassName, String.format("-D%s=%s", key, value), arg1, arg2}; + + final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName))); + assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort))); + final Properties dynamicProperties = clusterConfiguration.getDynamicProperties(); + + assertThat(dynamicProperties, hasEntry(key, value)); + + assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2)); + } + + @Test + public void testOnlyRequiredArguments() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory()); + + final String configDir = "/foo/bar"; + final String jobClassName = "foobar"; + final String[] args = {"--configDir", configDir, "--job-classname", jobClassName}; + + final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName))); + assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1))); + } + + @Test(expected = FlinkParseException.class) + public void testMissingRequiredArgument() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory()); + final String[] args = {}; + + commandLineParser.parse(args); + fail("Expected an FlinkParseException."); --- End diff -- Superfluous `fail` ---
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6320#discussion_r202299810 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link EntrypointClusterConfigurationParserFactory}. + */ +public class EntrypointClusterConfigurationParserFactoryTest extends TestLogger { + + @Test + public void testEntrypointClusterConfigurationParsing() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); + + final String configDir = "/foo/bar"; + final int restPort = 1234; + final String key = "key"; + final String value = "value"; + final String arg1 = "arg1"; + final String arg2 = "arg2"; + final String[] args = {"--configDir", configDir, "-r", String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2}; + + final EntrypointClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort))); + final Properties dynamicProperties = clusterConfiguration.getDynamicProperties(); + + assertThat(dynamicProperties, hasEntry(key, value)); + + assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2)); + } + + @Test + public void testOnlyRequiredArguments() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); + + final String configDir = "/foo/bar"; + final String[] args = {"--configDir", configDir}; + + final EntrypointClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1))); + } + + @Test(expected = FlinkParseException.class) + public void testMissingRequiredArgument() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); + final String[] args = {}; + + commandLineParser.parse(args); + fail("Expected an FlinkParseException."); --- End diff -- With `fail` is superfluous here. ``` @Test(expected = FlinkParseException.class) public void testMissingRequiredArgument() throws FlinkParseException { try { final CommandLineParser commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); final String[] args = {}; commandLineParser.parse(args); } catch (Exception e) {} }
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6320#discussion_r202300463 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link EntrypointClusterConfigurationParserFactory}. + */ +public class EntrypointClusterConfigurationParserFactoryTest extends TestLogger { + + @Test + public void testEntrypointClusterConfigurationParsing() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); --- End diff -- Could be moved to a `setUp` method. ---
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6320#discussion_r202305883 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java --- @@ -18,27 +18,43 @@ package org.apache.flink.runtime.entrypoint; -import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; + +import java.util.Properties; /** * Configuration class which contains the parsed command line arguments for * the {@link ClusterEntrypoint}. */ public class ClusterConfiguration { + + @Nonnull private final String configDir; - private final int restPort; + @Nonnull + private final Properties dynamicProperties; + + @Nonnull + private final String[] args; - public ClusterConfiguration(String configDir, int restPort) { - this.configDir = Preconditions.checkNotNull(configDir); - this.restPort = restPort; + public ClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args) { --- End diff -- `@Nonnull` is not used consistently, e.g., `FlinkParseException` is not annotated. I think it's easier to assume everything is non-null by default, and the rest should be annotated with `@Nullable`. ---
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6320#discussion_r202300255 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link ClusterConfigurationParserFactory}. + */ +public class ClusterConfigurationParserFactoryTest extends TestLogger { + + @Test + public void testEntrypointClusterConfigurationParsing() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); + + final String configDir = "/foo/bar"; + final String key = "key"; + final String value = "value"; + final String arg1 = "arg1"; + final String arg2 = "arg2"; + final String[] args = {"--configDir", configDir, String.format("-D%s=%s", key, value), arg1, arg2}; + + final ClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + final Properties dynamicProperties = clusterConfiguration.getDynamicProperties(); + + assertThat(dynamicProperties, hasEntry(key, value)); + + assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2)); + } + + @Test + public void testOnlyRequiredArguments() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); + + final String configDir = "/foo/bar"; + final String[] args = {"--configDir", configDir}; + + final ClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + } + + @Test(expected = FlinkParseException.class) + public void testMissingRequiredArgument() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); + final String[] args = {}; + + commandLineParser.parse(args); + fail("Expected an FlinkParseException."); --- End diff -- Superfluous `fail` ---
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6320#discussion_r202300519 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link ClusterConfigurationParserFactory}. + */ +public class ClusterConfigurationParserFactoryTest extends TestLogger { + + @Test + public void testEntrypointClusterConfigurationParsing() throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); --- End diff -- Could be moved to a setup method. ---
[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6319#discussion_r202271606 --- Diff: flink-container/docker/build.sh --- @@ -0,0 +1,118 @@ +#!/bin/sh --- End diff -- Just want to note that in other scripts, `bash` is used. Also `/usr/bin/env` is used to determine the `bash` binary. See `config.sh` for an example. ---
[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6240 I extended the regular `README.md`. ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r202057663 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat } // JobManager Memory - final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes(); + final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes(); --- End diff -- @dawidwys Why is it not enough? > When obtaining a value from the configuration via Configuration.getValue(ConfigOption), the deprecated keys will be checked in the order provided to this method. The first key for which a value is found will be used - that value will be returned. ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r202020619 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat } // JobManager Memory - final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes(); + final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes(); --- End diff -- @dawidwys As I understand, backwards compatibility should be prioritized. See https://github.com/apache/flink/pull/5448#issuecomment-394296226 > We need to add an additional MemoryUnit.parse() method that takes the "default" unit, so that we parse the old heap sizes such that they are in MB if nothing else is specified. ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r202017985 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat } // JobManager Memory - final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes(); + final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes(); --- End diff -- Would it hurt to add `.withDeprecatedKeys("jobmanager.heap.mb")`? In case someone does not use the scripts to start the cluster. ---
[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6297 cc: @dawidwys @StephanEwen ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r201980407 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat } // JobManager Memory - final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes(); + final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes(); --- End diff -- Was there a reason against making `JOB_MANAGER_HEAP_MEMORY_MB` a deprecated key for `JOB_MANAGER_HEAP_MEMORY` when you worked on FLINK-6469? That is: ``` public static final ConfigOption JOB_MANAGER_HEAP_MEMORY = key("jobmanager.heap.size") .defaultValue("1024m") .withDeprecatedKeys("jobmanager.heap.mb") ... ``` I am not sure what happens if someone only configures `jobmanager.heap.mb` in `flink-conf.yaml`. Will the value be respected for all deployment modes? Same holds for the TaskManager config key. ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r201981340 --- Diff: docs/ops/deployment/yarn_setup.md --- @@ -101,12 +101,12 @@ Usage: Optional -D Dynamic properties -d,--detached Start detached - -jm,--jobManagerMemory Memory for JobManager Container [in MB] + -jm,--jobManagerMemory Memory for JobManager Container [with unit, if not, use MB] --- End diff -- nit: I would reword this to: > *Memory for JobManager container with optional unit (default: MB).* ---
[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6297 I will take a look later. ---
[GitHub] flink pull request #6296: When submitting a flink job with yarn-cluster, fli...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6296#discussion_r201570938 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -1189,7 +1189,7 @@ private static Path setupSingleLocalResource( @Override public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException { - + String fileName = file.getFileName().toString(); --- End diff -- `fileName` is not used ---
[GitHub] flink pull request #6279: [FLINK-9706] Properly wait for termination of JobM...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6279#discussion_r201329956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -536,7 +540,25 @@ private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Except private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) { final CompletableFuture cleanupFuture = removeJob(jobId, cleanupHA); - registerOrphanedJobManagerTerminationFuture(cleanupFuture); + registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture); + } + + private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture jobManagerRunnerTerminationFuture) { + Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId)); + + jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture); + + // clean up the pending termination future + jobManagerRunnerTerminationFuture.thenRunAsync( + () -> { + final CompletableFuture terminationFuture = jobManagerTerminationFutures.remove(jobId); + + //noinspection ObjectEquality + if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) { + jobManagerTerminationFutures.put(jobId, terminationFuture); --- End diff -- Here you handle a case where a terminationFuture for a job got replaced. Under what circumstances can this happen? Doesn't the `checkState`: `Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));` prevent this? ---
[GitHub] flink pull request #6240: [FLINK-9004][tests] Implement Jepsen tests to test...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6240#discussion_r201312394 --- Diff: flink-jepsen/src/jepsen/flink/db.clj --- @@ -175,7 +175,7 @@ (c/su (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` " "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir -" " install-dir "/bin/yarn-session.sh -d -jm 2048 -tm 2048"))) +" " install-dir "/bin/yarn-session.sh -d -jm 2048m -tm 2048m"))) --- End diff -- See https://issues.apache.org/jira/browse/FLINK-9777 ---
[GitHub] flink pull request #6240: [FLINK-9004][tests] Implement Jepsen tests to test...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6240#discussion_r200667325 --- Diff: jepsen-flink/.gitignore --- @@ -0,0 +1,17 @@ +*.class +*.iml +*.jar +*.retry +.DS_Store +.hg/ +.hgignore +.idea/ +/.lein-* +/.nrepl-port +/checkouts +/classes +/target +pom.xml +pom.xml.asc +store +bin/DataStreamAllroundTestProgram.jar --- End diff -- Good point. I fixed it. ---
[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6240 It seems that we have to stick with `0.1.8` for now. According to legal, there are no concerns so far: https://issues.apache.org/jira/browse/LEGAL-392 ---
[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6240 `0.1.9.` was problematic. Currently testing `0.1.10`. ---
[GitHub] flink issue #6239: [FLINK-9004][tests] Implement Jepsen tests to test job av...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6239 @zentol No problem, I opened a new one. ---
[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6240 As @zentol mentioned, we might want to exclude this from create_source_release.sh. ---
[GitHub] flink pull request #6240: [FLINK-9004][tests] Implement Jepsen tests to test...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6240 [FLINK-9004][tests] Implement Jepsen tests to test job availability. ## What is the purpose of the change *Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement tests that verify Flink's HA capabilities under real-world faults, such as sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network partitions, etc. The Flink cluster under test is automatically deployed on YARN (session & job mode) and Mesos.* Previous PR got closed accidentally: https://github.com/apache/flink/pull/6239 ## Brief change log - *Implement Jepsen tests.* ## Verifying this change This change added tests and can be verified as follows: - *The changes themselves are tests.* - *Run Jepsen tests in docker containers.* - *Run unit tests with `lein test`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no** (at least not to Flink)) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** (but it will as soon as test failures appear) / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) cc: @tillrohrmann @cewood @zentol @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6240.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6240 commit 063e4621a5982b55ee7f7b0935290bbc717a5a45 Author: gyao Date: 2018-03-05T21:23:33Z [FLINK-9004][tests] Implement Jepsen tests to test job availability. Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement tests that verify Flink's HA capabilities under real-world faults, such as sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network partitions, etc. The Flink cluster under test is automatically deployed on YARN (session & job mode) and Mesos. Provide Dockerfiles for local test development. commit 46f0ea7b14c9c59d6cc40903486978f4fd8354d3 Author: gyao Date: 2018-07-02T12:21:18Z fixup! [FLINK-9004][tests] Implement Jepsen tests to test job availability. ---
[GitHub] flink pull request #6239: [FLINK-9004][tests] Implement Jepsen tests to test...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6239#discussion_r199690915 --- Diff: jepsen-flink/README.md --- @@ -0,0 +1,60 @@ +# jepsen.flink + +A Clojure project based on the [Jepsen](https://github.com/jepsen-io/jepsen) framework to find bugs in the +distributed coordination of Apache Flink®. + +## Test Coverage +Jepsen is a framework built to test the behavior of distributed systems +under faults. The tests in this particular project deploy Flink on either YARN or Mesos, submit a +job, and examine the availability of the job after injecting faults. +A job is said to be available if all the tasks of the job are running. +The faults that can be currently introduced to the Flink cluster include: +* Killing of TaskManager/JobManager processes +* Stopping HDFS NameNode +* Network partitions + +There are many more properties other than job availability that could be +verified but are not yet covered by this test suite, e.g., end-to-end exactly-once processing +semantics. + +## Usage +See the [Jepsen documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment) +for how to set up the environment to run tests. The `scripts/run-tests.sh` documents how to invoke +tests. The Flink job used for testing is located under +`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build the job first and copy +the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's +root. + +To simplify development, we have prepared Dockerfiles and a Docker Compose template +so that you can run the tests locally in containers. To build the images +and start the containers, simply run: + +$ cd docker +$ ./up.sh + +After the containers started, open a new terminal window and run `docker exec -it jepsen-control bash`. +This will allow you to run arbitrary commands on the control node. +To start the tests, you can use the `run-tests.sh` script in the `docker` directory, +which expects the number of test iterations, and a URI to a Flink distribution, e.g., + +./docker/run-tests.sh 1 https://example.com/flink-dist.tgz + +The project's root is mounted as a volume to all containers under the path `/jepsen`. +This means that changes to the test sources are immediately reflected in the control node container. +Moreover, this allows you to test locally built Flink distributions by copying the tarball to the +project's root and passing a URI with the `file://` scheme to the `run-tests.sh` script, e.g., +`file:///jepsen/flink-dist.tgz`. + +### Checking the output of tests + +Consult the `jepsen.log` file for the particular test run in the `store` folder. The final output of every test will be either + +Everything looks good! ã½('ã¼`)ã + +or + +Analysis invalid! (ï¾à²¥çಥï¼ï¾ â»ââ» --- End diff -- The Chinese character is the mouth and and the eyebrows of the emoji: https://github.com/jepsen-io/jepsen/blob/dd197bbce0b92c1ab3423709ac6bb0b2ee853365/jepsen/src/jepsen/core.clj#L532 ---
[GitHub] flink pull request #6239: [FLINK-9004][tests] Implement Jepsen tests to test...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6239#discussion_r199622404 --- Diff: jepsen-flink/README.md --- @@ -0,0 +1,60 @@ +# jepsen.flink + +A Clojure project based on the [Jepsen](https://github.com/jepsen-io/jepsen) framework to find bugs in the +distributed coordination of Apache Flink®. + +## Test Coverage +Jepsen is a framework built to test the behavior of distributed systems +under faults. The tests in this particular project deploy Flink on either YARN or Mesos, submit a +job, and examine the availability of the job after injecting faults. +A job is said to be available if all the tasks of the job are running. +The faults that can be currently introduced to the Flink cluster include: +* Killing of TaskManager/JobManager processes +* Stopping HDFS NameNode +* Network partitions + +There are many more properties other than job availability that could be +verified but are not yet covered by this test suite, e.g., end-to-end exactly-once processing +semantics. + +## Usage +See the [Jepsen documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment) +for how to set up the environment to run tests. The `scripts/run-tests.sh` documents how to invoke +tests. The Flink job used for testing is located under +`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build the job first and copy +the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's +root. + +To simplify development, we have prepared Dockerfiles and a Docker Compose template +so that you can run the tests locally in containers. To build the images +and start the containers, simply run: + +$ cd docker +$ ./up.sh + +After the containers started, open a new terminal window and run `docker exec -it jepsen-control bash`. +This will allow you to run arbitrary commands on the control node. +To start the tests, you can use the `run-tests.sh` script in the `docker` directory, +which expects the number of test iterations, and a URI to a Flink distribution, e.g., + +./docker/run-tests.sh 1 https://example.com/flink-dist.tgz + +The project's root is mounted as a volume to all containers under the path `/jepsen`. +This means that changes to the test sources are immediately reflected in the control node container. +Moreover, this allows you to test locally built Flink distributions by copying the tarball to the +project's root and passing a URI with the `file://` scheme to the `run-tests.sh` script, e.g., +`file:///jepsen/flink-dist.tgz`. + +### Checking the output of tests + +Consult the `jepsen.log` file for the particular test run in the `store` folder. The final output of every test will be either + +Everything looks good! ã½('ã¼`)ã + +or + +Analysis invalid! (ï¾à²¥çಥï¼ï¾ â»ââ» --- End diff -- it's a "flipping table emoji" ---
[GitHub] flink pull request #6239: [FLINK-9004][tests] Implement Jepsen tests to test...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6239#discussion_r199507669 --- Diff: jepsen-flink/docker/nodes --- @@ -0,0 +1,3 @@ +n1 --- End diff -- file must be excluded from RAT plugin ---
[GitHub] flink pull request #6239: [FLINK-9004][tests] Implement Jepsen tests to test...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6239 [FLINK-9004][tests] Implement Jepsen tests to test job availability. ## What is the purpose of the change *Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement tests that verify Flink's HA capabilities under real-world faults, such as sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network partitions, etc. The Flink cluster under test is automatically deployed on YARN (session & job mode) and Mesos. Provide Dockerfiles for local test development.* ## Brief change log - *Implement Jepsen tests.* ## Verifying this change This change added tests and can be verified as follows: - *The changes themselves are tests.* - *Run Jepsen tests in docker containers.* - *Run unit tests with `lein test`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no** (at least not to Flink)) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** (but it will as soon as test failures appear) / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) cc: @tillrohrmann @cewood You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6239.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6239 commit 063e4621a5982b55ee7f7b0935290bbc717a5a45 Author: gyao Date: 2018-03-05T21:23:33Z [FLINK-9004][tests] Implement Jepsen tests to test job availability. Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement tests that verify Flink's HA capabilities under real-world faults, such as sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network partitions, etc. The Flink cluster under test is automatically deployed on YARN (session & job mode) and Mesos. Provide Dockerfiles for local test development. ---
[GitHub] flink pull request #6213: [FLINK-9672] Fail fatally if job submission fails ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6213#discussion_r198912529 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -322,9 +322,6 @@ private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Except @Override public CompletableFuture> listJobs(Time timeout) { - if (jobManagerRunners.isEmpty()) { --- End diff -- nice ---
[GitHub] flink issue #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage of AMQP ...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5410 Taking a look once more. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r194109528 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -40,22 +42,63 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - protected final String queueName; + private final String queueName; private final RMQConnectionConfig rmqConnectionConfig; protected transient Connection connection; protected transient Channel channel; protected SerializationSchema schema; - private boolean logFailuresOnly = false; + protected boolean logFailuresOnly = false; + + private final RMQSinkPublishOptions messageCompute; + private final ReturnListener returnListener; /** * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param queueName The queue to publish messages to. * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes +* @param messageCompute A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties */ - public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema) { + private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema, --- End diff -- ping ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r194109518 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -105,7 +153,18 @@ public void invoke(IN value) { try { byte[] msg = schema.serialize(value); - channel.basicPublish("", queueName, null, msg); + if (messageCompute == null) { + channel.basicPublish("", queueName, null, msg); + } else { + String rk = messageCompute.computeRoutingKey(value); + String exchange = messageCompute.computeExchange(value); + channel.basicPublish((exchange != null) ? exchange : "", --- End diff -- ping ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r194109540 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -40,22 +42,63 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - protected final String queueName; + private final String queueName; private final RMQConnectionConfig rmqConnectionConfig; protected transient Connection connection; protected transient Channel channel; protected SerializationSchema schema; - private boolean logFailuresOnly = false; + protected boolean logFailuresOnly = false; + + private final RMQSinkPublishOptions messageCompute; + private final ReturnListener returnListener; /** * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param queueName The queue to publish messages to. * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes +* @param messageCompute A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties */ - public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema) { + private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema, + RMQSinkPublishOptions messageCompute, ReturnListener returnListener) { --- End diff -- ping ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193965626 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -105,7 +155,24 @@ public void invoke(IN value) { try { byte[] msg = schema.serialize(value); - channel.basicPublish("", queueName, null, msg); + if (publishOptions == null) { + channel.basicPublish("", queueName, null, msg); + } else { + boolean mandatory = publishOptions.computeMandatory(value); + boolean immediate = publishOptions.computeImmediate(value); + + if (returnListener == null && (mandatory || immediate)) { + throw new IllegalStateException("Setting mandatory and/or immediate flags to true requires a ReturnListener."); + } else { + String rk = publishOptions.computeRoutingKey(value); + String exchange = publishOptions.computeExchange(value); + + channel.basicPublish((exchange != null) ? exchange : "", --- End diff -- What do you think about not allowing the return of `computeRoutingKey` and `computeExchange` to be `null`. `basicPublish` does not allow `null`s and will throw an exception. This way our interfaces will be consistent with the ones from RMQ. ---
[GitHub] flink issue #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage of AMQP ...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5410 Thanks for updating @pduveau. There still some minor comments on `RMQSinkPublishOptions` and `README` that are open for discussion. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193964437 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.annotation.PublicEvolving; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +@PublicEvolving +public interface RMQSinkPublishOptions extends java.io.Serializable { + + /** +* Compute the message's routing key from the data. +* @param a The data used by the sink +* @return The routing key of the message +*/ + String computeRoutingKey(IN a); + + /** +* Compute the message's properties from the data. +* @param a The data used by the sink +* @return The message's properties (can be null) +*/ + BasicProperties computeProperties(IN a); + + /** +* Compute the exchange from the data. +* @param a The data used by the sink +* @return The exchange to publish the message to +*/ + String computeExchange(IN a); + + /** +* Compute the mandatory flag used in basic.publish method +* See AMQP API help for values. +* A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) +* @param a The data used by the sink +* @return The mandatory flag +*/ + boolean computeMandatory(IN a); --- End diff -- ping ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193964455 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.annotation.PublicEvolving; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +@PublicEvolving +public interface RMQSinkPublishOptions extends java.io.Serializable { + + /** +* Compute the message's routing key from the data. +* @param a The data used by the sink +* @return The routing key of the message +*/ + String computeRoutingKey(IN a); + + /** +* Compute the message's properties from the data. +* @param a The data used by the sink +* @return The message's properties (can be null) +*/ + BasicProperties computeProperties(IN a); + + /** +* Compute the exchange from the data. +* @param a The data used by the sink +* @return The exchange to publish the message to +*/ + String computeExchange(IN a); + + /** +* Compute the mandatory flag used in basic.publish method +* See AMQP API help for values. +* A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) +* @param a The data used by the sink +* @return The mandatory flag +*/ + boolean computeMandatory(IN a); + + /** +* Compute the immediate flag +* See AMQP API help for values. +* A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) +* @param a The data used by the sink +* @return The mandatory flag +*/ + boolean computeImmediate(IN a); --- End diff -- ping ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193964429 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.annotation.PublicEvolving; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +@PublicEvolving +public interface RMQSinkPublishOptions extends java.io.Serializable { + + /** +* Compute the message's routing key from the data. +* @param a The data used by the sink +* @return The routing key of the message +*/ + String computeRoutingKey(IN a); + + /** +* Compute the message's properties from the data. +* @param a The data used by the sink +* @return The message's properties (can be null) +*/ + BasicProperties computeProperties(IN a); + + /** +* Compute the exchange from the data. +* @param a The data used by the sink +* @return The exchange to publish the message to +*/ + String computeExchange(IN a); + + /** +* Compute the mandatory flag used in basic.publish method --- End diff -- ping ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193964402 --- Diff: flink-connectors/flink-connector-rabbitmq/README.md --- @@ -9,3 +9,7 @@ nor packages binaries from the "RabbitMQ AMQP Java Client". Users that create and publish derivative work based on Flink's RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). + +# This version provides a mechanism to handle AMQ Messaging features --- End diff -- ping ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193963772 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws Exception { } private RMQSink createRMQSink() throws Exception { - RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkFeatured() throws Exception { --- End diff -- I see that you have already changed it to `createRMQSinkWithOptions`. I think this works. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193963587 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -105,7 +153,18 @@ public void invoke(IN value) { try { byte[] msg = schema.serialize(value); - channel.basicPublish("", queueName, null, msg); + if (messageCompute == null) { + channel.basicPublish("", queueName, null, msg); + } else { + String rk = messageCompute.computeRoutingKey(value); + String exchange = messageCompute.computeExchange(value); + channel.basicPublish((exchange != null) ? exchange : "", + (rk != null) ? rk : "", + (returnListener != null) && messageCompute.computeMandatory(value), --- End diff -- I think failing fast is a reasonable behavior. Can you also adapt the javadocs? ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193126744 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws Exception { } private RMQSink createRMQSink() throws Exception { - RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkFeatured() throws Exception { --- End diff -- How about `createRMQSinkWithPublishOptions()`? I am not a AMQP specialist, if `RMQSinkFeatured` has a special meaning, then leave it. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193101683 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws Exception { } private RMQSink createRMQSink() throws Exception { - RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkFeatured() throws Exception { + publishOptions = new DummyPublishOptions(); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, serializationSchema, publishOptions); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkFeaturedReturnHandler() throws Exception { + publishOptions = new DummyPublishOptions(); --- End diff -- `publishOptions` and `returnListener` can be local variables. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193127128 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -105,7 +153,18 @@ public void invoke(IN value) { try { byte[] msg = schema.serialize(value); - channel.basicPublish("", queueName, null, msg); + if (messageCompute == null) { + channel.basicPublish("", queueName, null, msg); + } else { + String rk = messageCompute.computeRoutingKey(value); + String exchange = messageCompute.computeExchange(value); + channel.basicPublish((exchange != null) ? exchange : "", + (rk != null) ? rk : "", + (returnListener != null) && messageCompute.computeMandatory(value), --- End diff -- I would prefer that an`IllegalStateException` is thrown if the user code returns `true` without a `returnListener`. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193126347 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws Exception { } private RMQSink createRMQSink() throws Exception { - RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkFeatured() throws Exception { + publishOptions = new DummyPublishOptions(); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, serializationSchema, publishOptions); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkFeaturedReturnHandler() throws Exception { --- End diff -- How about `createRMQSinkWithReturnHandler()`? ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193126169 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws Exception { } private RMQSink createRMQSink() throws Exception { - RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkFeatured() throws Exception { + publishOptions = new DummyPublishOptions(); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, serializationSchema, publishOptions); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkFeaturedReturnHandler() throws Exception { + publishOptions = new DummyPublishOptions(); + returnListener = new DummyReturnHandler(); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, serializationSchema, publishOptions, returnListener); --- End diff -- nit: Java 7 diamond operator should be preferred, i.e., `new RMQSink<>(...)` ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193125374 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.annotation.PublicEvolving; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +@PublicEvolving +public interface RMQSinkPublishOptions extends java.io.Serializable { + + /** +* Compute the message's routing key from the data. +* @param a The data used by the sink +* @return The routing key of the message +*/ + String computeRoutingKey(IN a); + + /** +* Compute the message's properties from the data. +* @param a The data used by the sink +* @return The message's properties (can be null) +*/ + BasicProperties computeProperties(IN a); + + /** +* Compute the exchange from the data. +* @param a The data used by the sink +* @return The exchange to publish the message to +*/ + String computeExchange(IN a); + + /** +* Compute the mandatory flag used in basic.publish method +* See AMQP API help for values. +* A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) +* @param a The data used by the sink +* @return The mandatory flag +*/ + boolean computeMandatory(IN a); --- End diff -- How about we add a default implementation that returns RabbitMQs default value: ``` default boolean computeMandatory(IN a) { return false; } ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193122068 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -40,22 +42,63 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - protected final String queueName; + private final String queueName; private final RMQConnectionConfig rmqConnectionConfig; protected transient Connection connection; protected transient Channel channel; protected SerializationSchema schema; - private boolean logFailuresOnly = false; + protected boolean logFailuresOnly = false; + + private final RMQSinkPublishOptions messageCompute; + private final ReturnListener returnListener; /** * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param queueName The queue to publish messages to. * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes +* @param messageCompute A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties */ - public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema) { + private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema, --- End diff -- We should add some validation to assert that `queueName` is `null` iff. `messageCompute` is set. We should also check that `RMQConnectionConfig` and `SerializationSchema` must not be null. You may want to use `org.apache.flink.util.Preconditions` for that. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193119522 --- Diff: flink-connectors/flink-connector-rabbitmq/README.md --- @@ -9,3 +9,7 @@ nor packages binaries from the "RabbitMQ AMQP Java Client". Users that create and publish derivative work based on Flink's RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). + +# This version provides a mechanism to handle AMQ Messaging features --- End diff -- I think the Javadocs of the constructors are in better shape than the contents of the `README.md`. I think we can drop this paragraph for now. What do you think? ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193070753 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -66,12 +79,19 @@ public void before() throws Exception { } @Test - public void openCallDeclaresQueue() throws Exception { + public void openCallDeclaresQueueInStandardMode() throws Exception { createRMQSink(); verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null); } + @Test + public void openCallDontDeclaresQueueInFeaturedMode() throws Exception { + doThrow(Exception.class).when(channel).queueDeclare(null, false, false, false, null); --- End diff -- There is no assertion in this test. I would re-write it as: ``` @Test public void openCallDontDeclaresQueueInFeaturedMode() throws Exception { createRMQSinkFeatured(); verify(channel, never()).queueDeclare(null, false, false, false, null); } ``` ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193120662 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -105,7 +153,18 @@ public void invoke(IN value) { try { byte[] msg = schema.serialize(value); - channel.basicPublish("", queueName, null, msg); + if (messageCompute == null) { + channel.basicPublish("", queueName, null, msg); + } else { + String rk = messageCompute.computeRoutingKey(value); + String exchange = messageCompute.computeExchange(value); + channel.basicPublish((exchange != null) ? exchange : "", --- End diff -- I think it is better to assume that `computeRoutingKey` and `computeExchange` must return a non-null value. It would be consistent with the `basicPublish` method which also rejects nulls. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193126219 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws Exception { } private RMQSink createRMQSink() throws Exception { - RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); --- End diff -- nit: Java 7 diamond operator should be preferred, i.e., `new RMQSink<>(...)` ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193125823 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.annotation.PublicEvolving; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +@PublicEvolving +public interface RMQSinkPublishOptions extends java.io.Serializable { + + /** +* Compute the message's routing key from the data. +* @param a The data used by the sink +* @return The routing key of the message +*/ + String computeRoutingKey(IN a); + + /** +* Compute the message's properties from the data. +* @param a The data used by the sink +* @return The message's properties (can be null) +*/ + BasicProperties computeProperties(IN a); + + /** +* Compute the exchange from the data. +* @param a The data used by the sink +* @return The exchange to publish the message to +*/ + String computeExchange(IN a); + + /** +* Compute the mandatory flag used in basic.publish method +* See AMQP API help for values. +* A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) +* @param a The data used by the sink +* @return The mandatory flag +*/ + boolean computeMandatory(IN a); + + /** +* Compute the immediate flag +* See AMQP API help for values. +* A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) +* @param a The data used by the sink +* @return The mandatory flag +*/ + boolean computeImmediate(IN a); --- End diff -- Sorry I missed that RMQ does not implement this flag. We can remove it or add a Java 8 default implementation that returns `false`. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193126189 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws Exception { } private RMQSink createRMQSink() throws Exception { - RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkFeatured() throws Exception { + publishOptions = new DummyPublishOptions(); + RMQSink rmqSink = new RMQSink(rmqConnectionConfig, serializationSchema, publishOptions); --- End diff -- nit: Java 7 diamond operator should be preferred, i.e., `new RMQSink<>(...)` ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193122883 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -40,22 +42,63 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - protected final String queueName; + private final String queueName; private final RMQConnectionConfig rmqConnectionConfig; protected transient Connection connection; protected transient Channel channel; protected SerializationSchema schema; - private boolean logFailuresOnly = false; + protected boolean logFailuresOnly = false; + + private final RMQSinkPublishOptions messageCompute; + private final ReturnListener returnListener; /** * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param queueName The queue to publish messages to. * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes +* @param messageCompute A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties */ - public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema) { + private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema, + RMQSinkPublishOptions messageCompute, ReturnListener returnListener) { --- End diff -- `ReturnListener` is not `Serializable`. To work around this, we can add an interface as so: ``` public interface SerializableReturnListener extends ReturnListener, Serializable { } ``` ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193123998 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.annotation.PublicEvolving; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +@PublicEvolving +public interface RMQSinkPublishOptions extends java.io.Serializable { + + /** +* Compute the message's routing key from the data. +* @param a The data used by the sink +* @return The routing key of the message +*/ + String computeRoutingKey(IN a); + + /** +* Compute the message's properties from the data. +* @param a The data used by the sink +* @return The message's properties (can be null) +*/ + BasicProperties computeProperties(IN a); + + /** +* Compute the exchange from the data. +* @param a The data used by the sink +* @return The exchange to publish the message to +*/ + String computeExchange(IN a); + + /** +* Compute the mandatory flag used in basic.publish method --- End diff -- I think it's better to use Javadoc links: `{@link Channel#basicPublish(String, String, boolean, boolean, BasicProperties, byte[])}` ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193010305 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -40,22 +42,63 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - protected final String queueName; + private final String queueName; --- End diff -- Let's not change the visibility to `private` for now as it could break backwards compatibility. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193004950 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -109,6 +109,7 @@ private RMQConnectionConfig(String host, Integer port, String virtualHost, Strin * @param requestedChannelMax requested maximum channel number * @param requestedFrameMax requested maximum frame size * @param requestedHeartbeat requested heartbeat interval + * @param createQueue enable or diable queue create on setup --- End diff -- This parameter does not exist anymore. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193010378 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -40,22 +42,63 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - protected final String queueName; + private final String queueName; private final RMQConnectionConfig rmqConnectionConfig; protected transient Connection connection; protected transient Channel channel; protected SerializationSchema schema; - private boolean logFailuresOnly = false; + protected boolean logFailuresOnly = false; --- End diff -- Is it needed to change the visibility to `protected`? ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193006836 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -124,7 +159,82 @@ public void closeAllResources() throws Exception { verify(connection).close(); } + @Test + public void invokeFeaturedPublishBytesToQueue() throws Exception { + RMQSink rmqSink = createRMQSinkFeatured(); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + verify(serializationSchema).serialize(MESSAGE_STR); + verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + } + + @Test + public void invokeFeaturedReturnHandlerPublishBytesToQueue() throws Exception { + RMQSink rmqSink = createRMQSinkFeaturedReturnHandler(); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + verify(serializationSchema).serialize(MESSAGE_STR); + verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, true, + publishOptions.computeProperties(""), MESSAGE); + } + + @Test(expected = RuntimeException.class) + public void exceptionDuringFeaturedPublishingIsNotIgnored() throws Exception { + RMQSink rmqSink = createRMQSinkFeatured(); + + doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0)); + } + + @Test + public void exceptionDuringFeaturedPublishingIsIgnoredIfLogFailuresOnly() throws Exception { + RMQSink rmqSink = createRMQSinkFeatured(); + rmqSink.setLogFailuresOnly(true); + + doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0)); + } + + private class DummyPublishOptions implements RMQSinkPublishOptions { --- End diff -- nit: `serialVersionUID` is missing. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r190658100 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +public interface RMQSinkPublishOptions { + + /** +* Compute the message's routing key from the data. +* @param a The data used by the sink +* @return The routing key of the message +*/ + public String computeRoutingKey(IN a); --- End diff -- The `public` keyword is redundant here because method declarations in an interface are always `public`. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r190888216 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +public interface RMQSinkPublishOptions { --- End diff -- I think the interface should be annotated with `@PublicEvolving`. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r190650550 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +public interface RMQSinkPublishOptions { --- End diff -- This must extend `java.io.Serializable` because `RMQSink` has a non-transient field of this type. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r190654087 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java --- @@ -151,7 +151,9 @@ public void open(Configuration config) throws Exception { if (channel == null) { throw new RuntimeException("None of RabbitMQ channels are available"); } - setupQueue(); + if (rmqConnectionConfig.hasToCreateQueueOnSetup()) { --- End diff -- There was a PR that did something similar and got rejected. See: https://github.com/apache/flink/pull/4979 and https://issues.apache.org/jira/browse/FLINK-8018 ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r190888669 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -40,22 +40,36 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - protected final String queueName; + protected String queueName; private final RMQConnectionConfig rmqConnectionConfig; protected transient Connection connection; protected transient Channel channel; protected SerializationSchema schema; - private boolean logFailuresOnly = false; + protected boolean logFailuresOnly = false; + + protected RMQSinkPublishOptions messageCompute; /** * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param queueName The queue to publish messages to. * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes */ public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema) { - this.rmqConnectionConfig = rmqConnectionConfig; + this(rmqConnectionConfig, schema, null); this.queueName = queueName; + } + + /** +* @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. +* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes +* @param messageCompute A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties + */ + public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema schema, --- End diff -- I think the constructor should be annotated with `@PublicEvolving`, and `messageCompute` should come after `rmqConnectionConfig` to be consistent with the other constructor. ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r190887946 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +public interface RMQSinkPublishOptions { --- End diff -- Shouldn't be the `mandatory` flag in ` basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException` covered as well? ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r190648628 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -40,22 +40,36 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - protected final String queueName; + protected String queueName; private final RMQConnectionConfig rmqConnectionConfig; protected transient Connection connection; protected transient Channel channel; protected SerializationSchema schema; - private boolean logFailuresOnly = false; + protected boolean logFailuresOnly = false; + + protected RMQSinkPublishOptions messageCompute; --- End diff -- This field and `queueName` should be `final`. I think you need a third private constructor to achieve this: ``` private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema, RMQSinkPublishOptions messageCompute) { this.rmqConnectionConfig = rmqConnectionConfig; this.queueName = queueName; this.schema = schema; this.messageCompute = messageCompute; } ``` Then you can write ``` public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema schema, RMQSinkPublishOptions messageCompute) { this(rmqConnectionConfig, null, schema, messageCompute); } ``` ---
[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r190647503 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -43,14 +49,27 @@ public class RMQSinkTest { private static final String QUEUE_NAME = "queue"; + private static final String EXCHANGE = "exchange"; + private static final String ROUTING_KEY = "application.component.error"; + private static final String EXPIRATION = "1"; private static final String MESSAGE_STR = "msg"; private static final byte[] MESSAGE = new byte[1]; + private static Map<String, Object> headers = new HashMap<String, Object>(); + private static AMQP.BasicProperties props; --- End diff -- `headers.put("Test", new String("My Value"));` can be simplified to `headers.put("Test", "My Value");` The static field `headers` is mutable state, which should be avoided. Why not just: ``` private static AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .headers(Collections.singletonMap("Test", "My Value")) .expiration(EXPIRATION) .build(); ``` Then the static initializer is not even needed. ---
[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6069#discussion_r190605670 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -314,7 +319,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) log.info("Requesting blob server port."); CompletableFuture portFuture = sendRequest( - BlobServerPortHeaders.getInstance()); + BlobServerPortHeaders.getInstance(), + EmptyMessageParameters.getInstance()); --- End diff -- This is not needed. There should be an overload of `sendRequest` that does it. ---
[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6069#discussion_r190605113 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -274,11 +275,17 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) final JobMessageParameters params = new JobMessageParameters(); params.jobPathParameter.resolve(jobId); - CompletableFuture responseFuture = sendRequest(detailsHeaders, params); + CompletableFuture responseFuture = sendRequest( + detailsHeaders, + params); return responseFuture.thenApply(JobDetailsInfo::getJobStatus); } + private Predicate isConnectionProblemOrServiceUnavailable() { --- End diff -- nit: can be `static` and defined closer to `isConnectionProblemException` and `isServiceUnavailable` ---
[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6067#discussion_r190603900 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -732,19 +744,10 @@ public void heartbeatFromResourceManager(ResourceID resourceID) { allocationId, jobId, resourceManagerId); try { - if (resourceManagerConnection == null) { - final String message = "TaskManager is not connected to a resource manager."; + if (!isConnectedToResourceManager(resourceManagerId)) { + final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId); log.debug(message); - throw new SlotAllocationException(message); - } - - if (!Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) { - final String message = "The leader id " + resourceManagerId + - " does not match with the leader id of the connected resource manager " + - resourceManagerConnection.getTargetLeaderId() + '.'; - - log.debug(message); - throw new SlotAllocationException(message); + throw new TaskManagerException(message); --- End diff -- We already talked about it offline. ---
[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6067#discussion_r190603365 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java --- @@ -1483,6 +1485,216 @@ public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio } } + /** +* Tests that we ignore slot requests if the TaskExecutor is not +* registered at a ResourceManager. +*/ + @Test + public void testIgnoringSlotRequestsIfNotRegistered() throws Exception { + final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build(); + + final TaskExecutor taskExecutor = createTaskExecutor(taskManagerServices); + + taskExecutor.start(); + + try { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + + final CompletableFuture registrationFuture = new CompletableFuture<>(); + final CompletableFuture taskExecutorResourceIdFuture = new CompletableFuture<>(); + + testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> { + taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1); +return registrationFuture; +}); + + rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + + final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class); + + final ResourceID resourceId = taskExecutorResourceIdFuture.get(); + + final SlotID slotId = new SlotID(resourceId, 0); + final CompletableFuture slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), timeout); + + try { + slotRequestResponse.get(); + fail("We should not be able to request slots before the TaskExecutor is registered at the ResourceManager."); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(TaskManagerException.class)); + } + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + } + + /** +* Tests that the TaskExecutor tries to reconnect to a ResourceManager from which it +* was explicitly disconnected. +*/ + @Test + public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception { + final long heartbeatInterval = 1000L; + final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final TaskExecutor taskExecutor = new TaskExecutor( + rpc, + TaskManagerConfiguration.fromConfiguration(configuration), + haServices, + new TaskManagerServicesBuilder() + .setTaskSlotTable(taskSlotTable) + .setTaskManagerLocation(taskManagerLocation) + .build(), + new HeartbeatServices(heartbeatInterval, 1000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + dummyBlobCacheService, + testingFatalErrorHandler); + + taskExecutor.start(); + + try { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + final ClusterInformation clusterInformation = new ClusterInformation("foobar", 1234); + final CompletableFuture registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.gener
[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6067#discussion_r190603934 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -933,25 +956,35 @@ public void requestHeartbeat(ResourceID resourceID, SlotReport slotReport) { blobCacheService.setBlobServerAddress(blobServerAddress); + establishedResourceManagerConnection = new EstablishedResourceManagerConnection( + resourceManagerGateway, + resourceManagerResourceId, + taskExecutorRegistrationId); + stopRegistrationTimeout(); } private void closeResourceManagerConnection(Exception cause) { - if (resourceManagerConnection != null) { - - if (resourceManagerConnection.isConnected()) { - if (log.isDebugEnabled()) { - log.debug("Close ResourceManager connection {}.", - resourceManagerConnection.getResourceManagerId(), cause); - } else { - log.info("Close ResourceManager connection {}.", - resourceManagerConnection.getResourceManagerId()); - } - resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId()); + if (establishedResourceManagerConnection != null) { + final ResourceID resourceManagerResourceId = establishedResourceManagerConnection.getResourceManagerResourceId(); - ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); - resourceManagerGateway.disconnectTaskManager(getResourceID(), cause); + if (log.isDebugEnabled()) { + log.debug("Close ResourceManager connection {}.", --- End diff -- We already talked about it offline. ---
[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6067#discussion_r190583266 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -933,25 +956,35 @@ public void requestHeartbeat(ResourceID resourceID, SlotReport slotReport) { blobCacheService.setBlobServerAddress(blobServerAddress); + establishedResourceManagerConnection = new EstablishedResourceManagerConnection( + resourceManagerGateway, + resourceManagerResourceId, + taskExecutorRegistrationId); + stopRegistrationTimeout(); } private void closeResourceManagerConnection(Exception cause) { - if (resourceManagerConnection != null) { - - if (resourceManagerConnection.isConnected()) { - if (log.isDebugEnabled()) { - log.debug("Close ResourceManager connection {}.", - resourceManagerConnection.getResourceManagerId(), cause); - } else { - log.info("Close ResourceManager connection {}.", - resourceManagerConnection.getResourceManagerId()); - } - resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId()); + if (establishedResourceManagerConnection != null) { + final ResourceID resourceManagerResourceId = establishedResourceManagerConnection.getResourceManagerResourceId(); - ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); - resourceManagerGateway.disconnectTaskManager(getResourceID(), cause); + if (log.isDebugEnabled()) { + log.debug("Close ResourceManager connection {}.", --- End diff -- I was wondering whether `cause` can get logged twice: 1. ``` log.debug("Close ResourceManager connection {}.", resourceManagerResourceId, cause); ``` 2. ``` log.debug("Terminating registration attempts towards ResourceManager {}.", resourceManagerConnection.getTargetAddress(), cause); ``` ---
[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6067#discussion_r190557380 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -732,19 +744,10 @@ public void heartbeatFromResourceManager(ResourceID resourceID) { allocationId, jobId, resourceManagerId); try { - if (resourceManagerConnection == null) { - final String message = "TaskManager is not connected to a resource manager."; + if (!isConnectedToResourceManager(resourceManagerId)) { + final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId); log.debug(message); - throw new SlotAllocationException(message); - } - - if (!Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) { - final String message = "The leader id " + resourceManagerId + - " does not match with the leader id of the connected resource manager " + - resourceManagerConnection.getTargetLeaderId() + '.'; - - log.debug(message); - throw new SlotAllocationException(message); + throw new TaskManagerException(message); --- End diff -- Why not return an exceptional future here? ---
[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6067#discussion_r190602158 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java --- @@ -1483,6 +1485,216 @@ public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio } } + /** +* Tests that we ignore slot requests if the TaskExecutor is not +* registered at a ResourceManager. +*/ + @Test + public void testIgnoringSlotRequestsIfNotRegistered() throws Exception { + final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build(); + + final TaskExecutor taskExecutor = createTaskExecutor(taskManagerServices); + + taskExecutor.start(); + + try { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + + final CompletableFuture registrationFuture = new CompletableFuture<>(); + final CompletableFuture taskExecutorResourceIdFuture = new CompletableFuture<>(); + + testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> { + taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1); +return registrationFuture; +}); + + rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + + final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class); + + final ResourceID resourceId = taskExecutorResourceIdFuture.get(); + + final SlotID slotId = new SlotID(resourceId, 0); + final CompletableFuture slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), timeout); + + try { + slotRequestResponse.get(); + fail("We should not be able to request slots before the TaskExecutor is registered at the ResourceManager."); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(TaskManagerException.class)); + } + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + } + + /** +* Tests that the TaskExecutor tries to reconnect to a ResourceManager from which it +* was explicitly disconnected. +*/ + @Test + public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception { + final long heartbeatInterval = 1000L; + final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final TaskExecutor taskExecutor = new TaskExecutor( + rpc, + TaskManagerConfiguration.fromConfiguration(configuration), + haServices, + new TaskManagerServicesBuilder() + .setTaskSlotTable(taskSlotTable) + .setTaskManagerLocation(taskManagerLocation) + .build(), + new HeartbeatServices(heartbeatInterval, 1000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + dummyBlobCacheService, + testingFatalErrorHandler); + + taskExecutor.start(); + + try { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + final ClusterInformation clusterInformation = new ClusterInformation("foobar", 1234); + final CompletableFuture registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.gener
[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6067#discussion_r190602807 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java --- @@ -1139,6 +1147,61 @@ public void testReportAllocatedSlot() throws Exception { } } + /** +* Testst that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call +* fails. +*/ + @Test + public void testSlotRequestFailure() throws Exception { + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions())) { + + final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar"); + slotManager.registerSlotRequest(slotRequest); + + final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1); + final BlockingQueue<CompletableFuture> responseQueue = new ArrayBlockingQueue<>(1); + + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> { + requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5); --- End diff -- I think you are using spaces for indentation here. ---
[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6068#discussion_r190500268 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java --- @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception { assertThat(deleteAllFuture.isDone(), is(false)); } + /** +* Tests that the {@link RunningJobsRegistry} entries are cleared after the +* job reached a terminal state. +*/ + @Test + public void testRunningJobsRegistryCleanup() throws Exception { + submitJob(); + + runningJobsRegistry.setJobRunning(jobId); + assertThat(runningJobsRegistry.contains(jobId), is(true)); + + resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build()); + + // wait for the clearing + runningJobsRegistry.getClearedFuture().get(); + + assertThat(runningJobsRegistry.contains(jobId), is(false)); + } + + private static final class TestingRunningJobsRegistry implements RunningJobsRegistry { + + private final JobID jobId; + + private final CompletableFuture clearedFuture = new CompletableFuture<>(); + + private JobSchedulingStatus jobSchedulingStatus = JobSchedulingStatus.PENDING; + + private boolean containsJob = false; + + private TestingRunningJobsRegistry(JobID jobId) { + this.jobId = jobId; + } + + public CompletableFuture getClearedFuture() { + return clearedFuture; + } + + @Override + public void setJobRunning(JobID jobID) throws IOException { + checkJobId(jobID); + containsJob = true; + jobSchedulingStatus = JobSchedulingStatus.RUNNING; + } + + private void checkJobId(JobID jobID) { + Preconditions.checkArgument(jobId.equals(jobID)); --- End diff -- Not the best variable names here: `jobId` vs. `jobID` ---
[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6068#discussion_r190548385 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java --- @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception { assertThat(deleteAllFuture.isDone(), is(false)); } + /** +* Tests that the {@link RunningJobsRegistry} entries are cleared after the +* job reached a terminal state. +*/ + @Test + public void testRunningJobsRegistryCleanup() throws Exception { + submitJob(); + + runningJobsRegistry.setJobRunning(jobId); + assertThat(runningJobsRegistry.contains(jobId), is(true)); + + resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build()); + + // wait for the clearing + runningJobsRegistry.getClearedFuture().get(); + + assertThat(runningJobsRegistry.contains(jobId), is(false)); + } + + private static final class TestingRunningJobsRegistry implements RunningJobsRegistry { + + private final JobID jobId; + + private final CompletableFuture clearedFuture = new CompletableFuture<>(); --- End diff -- For simplicity I would use a `CountDownLatch` here. ---
[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6068#discussion_r190548607 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java --- @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception { assertThat(deleteAllFuture.isDone(), is(false)); } + /** +* Tests that the {@link RunningJobsRegistry} entries are cleared after the +* job reached a terminal state. +*/ + @Test + public void testRunningJobsRegistryCleanup() throws Exception { + submitJob(); + + runningJobsRegistry.setJobRunning(jobId); + assertThat(runningJobsRegistry.contains(jobId), is(true)); + + resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build()); + + // wait for the clearing + runningJobsRegistry.getClearedFuture().get(); + + assertThat(runningJobsRegistry.contains(jobId), is(false)); + } + + private static final class TestingRunningJobsRegistry implements RunningJobsRegistry { --- End diff -- Maybe `SingleRunningJobRegistry`. ---
[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6035#discussion_r189004536 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -1605,9 +1609,16 @@ public void notifyHeartbeatTimeout(final ResourceID resourceId) { runAsync(() -> { log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId); - closeResourceManagerConnection( - new TimeoutException( - "The heartbeat of ResourceManager with id " + resourceId + " timed out.")); + if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) { + final String resourceManagerAddress = establishedResourceManagerConnection.getResourceManagerGateway().getAddress(); --- End diff -- Declaration and assignment can be moved closer to `createResourceManagerConnection`. ---
[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6035#discussion_r189012246 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception cause) { resourceManagerConnection.close(); resourceManagerConnection = null; } + + startRegistrationTimeout(); + } + + private void startRegistrationTimeout() { + final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration(); + + if (maxRegistrationDuration != null) { + final UUID newRegistrationTimeoutId = UUID.randomUUID(); + currentRegistrationTimeoutId = newRegistrationTimeoutId; + scheduleRunAsync(() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration); + } + } + + private void stopRegistrationTimeout() { + currentRegistrationTimeoutId = null; + } + + private void registrationTimeout(@Nonnull UUID registrationTimeoutId) { + if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) { + final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration(); + + onFatalError( + new RegistrationTimeoutException( + String.format("Could not register at the ResourceManager within the specified maximum " + + "registration duration %s. This indicates a problem with this instance. Terminating now.", + maxRegistrationDuration))); + } else { + log.debug("Ignoring outdated registration timeout."); --- End diff -- I think this will be logged even if the registration succeeded. ---