[GitHub] flink pull request #6419: [FLINK-9949][tests] Kill Flink processes in DB/tea...

2018-07-25 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6419

[FLINK-9949][tests] Kill Flink processes in DB/teardown

## What is the purpose of the change

*Not killing Flink processes at the end of a test, can cause interference 
with subsequent test runs.*

## Brief change log
  - *Kill Flink processes in `DB/teardown!`.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Ran tests in docker.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

cc: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9949

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6419.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6419


commit db3cecb5c9fb16d707e02b436244ba8fd5ee1ce8
Author: gyao 
Date:   2018-07-25T13:28:40Z

[FLINK-9949][tests] Kill Flink processes in DB/teardown




---


[GitHub] flink pull request #6418: [FLINK-9939][runtime] Mesos: Not setting TMP dirs ...

2018-07-25 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6418

[FLINK-9939][runtime] Mesos: Not setting TMP dirs causes NPE

## What is the purpose of the change

*This fixes a possible NPE when deploying on Mesos.*

## Brief change log

  - *Add null check to 
`BootstrapTools.updateTmpDirectoriesInConfiguration(...)`, and add `@Nullable` 
annotation.*

## Verifying this change

This change added tests and can be verified as follows:
  - *Added unit test to assert that `updateTmpDirectoriesInConfiguration` 
can handle `null` values.*
  - *Manually deployed on Mesos.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9939

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6418.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6418


commit eb03ada77db7633d330be7847dbdf9ee801a9bee
Author: gyao 
Date:   2018-07-25T10:23:20Z

[hotfix][tests] Fix checkstyle violations in BootstrapToolsTest.

commit 261d4d7423b9ca179ac0004625f51b7b71655d63
Author: gyao 
Date:   2018-07-25T11:57:34Z

[FLINK-9939][runtime] Add null check before setting tmp dirs in config.




---


[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...

2018-07-24 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6406

[FLINK-9159][runtime] Sanity check default timeout values

## What is the purpose of the change

*Set the default timeouts for resource release to sane values. Consolidate 
config keys and documentation.*

## Brief change log

  - *Set default value of `mesos.failover-timeout` to 1 week.*
  - *Deprecate config key `slotmanager.request-timeout`*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test `SlotManagerConfigurationTest` to verify that slot request 
timeouts are set correctly.*
  - *Manually deployed on Mesos 1.5.0.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

cc: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9159

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6406.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6406


commit 96ef18e322c5623a609382057985b35971ba3234
Author: gyao 
Date:   2018-07-24T13:38:47Z

[FLINK-9159][mesos] Set default value of mesos.failover-timeout to 1 week.

commit 112122912d7dd78c612c1648f3e2b041ae65afa6
Author: gyao 
Date:   2018-07-24T13:48:27Z

[FLINK-9159][runtime] Deprecate config key slotmanager.request-timeout

- Replace config key slotmanager.request-timeout with slot.request.timeout 
because
both keys have effectively the same semantics.
- Rename key slotmanager.taskmanager-timeout to
resourcemanager.taskmanager-timeout.

commit 787f7c1480a5676e7ce52092265b3cd051064e3c
Author: gyao 
Date:   2018-07-24T13:55:16Z

[hotfix][docs] Add -DskipTests flag to command that generates docs.




---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204400729
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 ---
@@ -504,6 +512,107 @@ public void 
testSlotReleasingFailsSchedulingOperation() throws Exception {
assertThat(executionGraph.getTerminationFuture().get(), 
is(JobStatus.FAILED));
}
 
+   /**
+* Tests that all slots are being returned to the {@link SlotOwner} if 
the
+* {@link ExecutionGraph} is being cancelled. See FLINK-9908
+*/
+   @Test
+   public void testCancellationOfIncompleteScheduling() throws Exception {
+   final int parallelism = 10;
+
+   final JobVertex jobVertex = new JobVertex("Test job vertex");
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+   jobVertex.setParallelism(parallelism);
+
+   final JobGraph jobGraph = new JobGraph(jobVertex);
+   jobGraph.setAllowQueuedScheduling(true);
+   jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+   final TestingSlotOwner slotOwner = new TestingSlotOwner();
+   final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+   final ConcurrentMap slotRequestIds = 
new ConcurrentHashMap<>(parallelism);
+   final CountDownLatch requestedSlotsLatch = new 
CountDownLatch(parallelism);
+
+   final TestingSlotProvider slotProvider = new 
TestingSlotProvider(
+   (SlotRequestId slotRequestId) -> {
+   slotRequestIds.put(slotRequestId, 1);
+   requestedSlotsLatch.countDown();
+   return new CompletableFuture<>();
+   });
+
+
+   final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph, slotProvider);
+
+   executionGraph.scheduleForExecution();
+
+   // wait until we have requested all slots
+   requestedSlotsLatch.await();
+
+   final ExpectedSlotRequestIds expectedSlotRequestIds = new 
ExpectedSlotRequestIds(slotRequestIds.keySet());
+   slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> 
expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId()));
+   
slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId);
+
+   final OneShotLatch slotRequestsBeingFulfilled = new 
OneShotLatch();
+
+   // start completing the slot requests asynchronously
+   executor.execute(
+   () -> {
+   slotRequestsBeingFulfilled.trigger();
+
+   for (SlotRequestId slotRequestId : 
slotRequestIds.keySet()) {
+   final SingleLogicalSlot 
singleLogicalSlot = createSingleLogicalSlot(slotOwner, taskManagerGateway, 
slotRequestId);
+   slotProvider.complete(slotRequestId, 
singleLogicalSlot);
+   }
+   });
+
+   // make sure that we complete cancellations of deployed tasks
+   taskManagerGateway.setCancelConsumer(
+   (ExecutionAttemptID executionAttemptId) -> {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttemptId);
+
+   // if the execution was cancelled in state 
SCHEDULING, then it might already have been removed
+   if (execution != null) {
+   execution.cancelingComplete();
+   }
+   }
+   );
+
+   slotRequestsBeingFulfilled.await();
+
+   executionGraph.cancel();
+
+   expectedSlotRequestIds.waitForAllSlotRequestIds();
+   }
+
+   private static final class ExpectedSlotRequestIds {
--- End diff --

I think this is fine but I would have probably used a synchronized set and 
a `CountDownLatch`. It would allow for timeouts and also asserting on which ids 
are missing.


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204399866
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 ---
@@ -504,6 +512,107 @@ public void 
testSlotReleasingFailsSchedulingOperation() throws Exception {
assertThat(executionGraph.getTerminationFuture().get(), 
is(JobStatus.FAILED));
}
 
+   /**
+* Tests that all slots are being returned to the {@link SlotOwner} if 
the
+* {@link ExecutionGraph} is being cancelled. See FLINK-9908
+*/
+   @Test
+   public void testCancellationOfIncompleteScheduling() throws Exception {
+   final int parallelism = 10;
+
+   final JobVertex jobVertex = new JobVertex("Test job vertex");
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+   jobVertex.setParallelism(parallelism);
+
+   final JobGraph jobGraph = new JobGraph(jobVertex);
+   jobGraph.setAllowQueuedScheduling(true);
+   jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+   final TestingSlotOwner slotOwner = new TestingSlotOwner();
+   final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+   final ConcurrentMap slotRequestIds = 
new ConcurrentHashMap<>(parallelism);
+   final CountDownLatch requestedSlotsLatch = new 
CountDownLatch(parallelism);
+
+   final TestingSlotProvider slotProvider = new 
TestingSlotProvider(
+   (SlotRequestId slotRequestId) -> {
+   slotRequestIds.put(slotRequestId, 1);
+   requestedSlotsLatch.countDown();
+   return new CompletableFuture<>();
+   });
+
+
+   final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph, slotProvider);
+
+   executionGraph.scheduleForExecution();
+
+   // wait until we have requested all slots
+   requestedSlotsLatch.await();
+
+   final ExpectedSlotRequestIds expectedSlotRequestIds = new 
ExpectedSlotRequestIds(slotRequestIds.keySet());
+   slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> 
expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId()));
+   
slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId);
+
+   final OneShotLatch slotRequestsBeingFulfilled = new 
OneShotLatch();
--- End diff --

What are the benefits of `OneShotLatch` compared to `new 
CountDownLatch(1)`. On first glance it seems like an unnecessary 
re-implementation.


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204394265
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -384,6 +393,8 @@ private MultiTaskSlot(
MultiTaskSlot allocateMultiTaskSlot(SlotRequestId 
slotRequestId, AbstractID groupId) {
Preconditions.checkState(!super.contains(groupId));
 
+   LOG.debug("Create nested multi task slot [{}] in parent 
multi task slot [{}] for group {}.", slotRequestId, getSlotRequestId(), 
groupId);
--- End diff --

add `[]`


---


[GitHub] flink issue #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failAllocati...

2018-07-23 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6386
  
https://github.com/apache/flink/pull/6369 should also be merged after this


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204384134
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -983,7 +983,7 @@ private void startCheckpointScheduler(final 
CheckpointCoordinator checkpointCoor
 
@Override
public void notifyAllocationFailure(AllocationID allocationID, 
Exception cause) {
-   slotPool.failAllocation(allocationID, cause);
+   slotPoolGateway.failAllocation(allocationID, cause);
--- End diff --

The changes don't match the commit message.


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204373664
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -323,7 +323,7 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Allocating slot with request {} for task execution 
{}", slotRequestId, task.getTaskToExecute());
+   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
--- End diff --

When should the value be enclosed in `[]`?


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204368060
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -1011,7 +1011,7 @@ public void failAllocation(final AllocationID 
allocationID, final Exception caus
failPendingRequest(pendingRequest, cause);
}
else if (availableSlots.tryRemove(allocationID)) {
-   log.debug("Failed available slot [{}] with ", 
allocationID, cause);
+   log.debug("Failed available slot with allocation id 
{}.", allocationID, cause);
--- End diff --

Why remove *allocation id*?


---


[GitHub] flink pull request #6369: [FLINK-9892][tests] Disable local recovery in Jeps...

2018-07-19 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6369

[FLINK-9892][tests] Disable local recovery in Jepsen tests.

## What is the purpose of the change

*Until FLINK-9635 is fixed, local recovery should be disabled in the Jepsen 
tests.*

## Brief change log

  - *Disable local recovery in Jepsen tests.*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9892

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6369.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6369


commit 6ac6963d4770d2b10283d14e55ff07a210cfa2c2
Author: gyao 
Date:   2018-07-19T07:35:44Z

[FLINK-9892][tests] Disable local recovery in Jepsen tests.




---


[GitHub] flink pull request #6368: [FLINK-9890] Remove obsolete class ResourceManager...

2018-07-19 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6368

[FLINK-9890] Remove obsolete class ResourceManagerConfiguration

## What is the purpose of the change

*The configuration values are effectively not used. This commit removes the 
class
and all its usages.*


## Brief change log
  - *Remove ResourceManagerConfiguration and all its usages.*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9890

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6368.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6368


commit 50e05c436511c84d58561a6909520e28666f7358
Author: gyao 
Date:   2018-07-18T14:23:59Z

[FLINK-9890][Distributed Coordination] Remove obsolete class 
ResourceManagerConfiguration

The configuration values are effectively not used. This commit removes the 
class
and all its usages.




---


[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

2018-07-17 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6297
  
@dawidwys Please merge.


---


[GitHub] flink issue #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6320
  
I tested the tooling around docker-compose and it works for me when using 
the local distribution.


---


[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6320#discussion_r202351378
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
+
+## Installation
+
+Install the most recent stable version of docker
+https://docs.docker.com/installation/
+
+## Build
+
+Images are based on the official Java Alpine (OpenJDK 8) image. If you 
want to
+build the flink image run:
+
+sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job
+
+or
+
+docker build -t flink .
--- End diff --

There are some args missing to make these commands work. Is it intentional? 
 


---


[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6320#discussion_r202348473
  
--- Diff: flink-container/docker/build.sh ---
@@ -0,0 +1,128 @@
+#!/bin/sh
+

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+usage() {
+  cat < --from-local-dist [--image-name 
]
+  build.sh --job-jar  --from-archive 
 [--image-name ]
+  build.sh --job-jar  --from-release --flink-version 
 --hadoop-version  --scala-version  [--image-name ]
+  build.sh --help
+
+  If the --image-name flag is not used the built image name will be 
'flink'.
+HERE
+  exit 1
+}
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+--job-jar)
+JOB_JAR_PATH="$2"
+shift
+;;
+--from-local-dist)
+FROM_LOCAL="true"
+;;
+--from-archive)
+FROM_ARCHIVE="$2"
+shift
+;;
+--from-release)
+FROM_RELEASE="true"
+;;
+--image-name)
+IMAGE_NAME="$2"
+shift
+;;
+--flink-version)
+FLINK_VERSION="$2"
+shift
+;;
+--hadoop-version)
+HADOOP_VERSION="$(echo "$2" | sed 's/\.//')"
+shift
+;;
+--scala-version)
+SCALA_VERSION="$2"
+shift
+;;
+--kubernetes-certificates)
+CERTIFICATES_DIR="$2"
+shift
+;;
+--help)
+usage
+;;
+*)
+# unknown option
+;;
+  esac
+  shift
+done
+
+IMAGE_NAME=${IMAGE_NAME:-flink-job}
+
+# TMPDIR must be contained within the working directory so it is part of 
the
+# Docker context. (i.e. it can't be mktemp'd in /tmp)
+TMPDIR=_TMP_
+
+cleanup() {
+rm -rf "${TMPDIR}"
+}
+trap cleanup EXIT
+
+mkdir -p "${TMPDIR}"
+
+JOB_JAR_TARGET="${TMPDIR}/job.jar"
+cp ${JOB_JAR_PATH} ${JOB_JAR_TARGET}
+
+if [ -n "${FROM_RELEASE}" ]; then
+
+  [[ -n "${FLINK_VERSION}" ]] && [[ -n "${HADOOP_VERSION}" ]] && [[ -n 
"${SCALA_VERSION}" ]] || usage
+
+  FLINK_BASE_URL="$(curl -s 
https://www.apache.org/dyn/closer.cgi\?preferred\=true)flink/flink-${FLINK_VERSION}/"
+  
FLINK_DIST_FILE_NAME="flink-${FLINK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala_${SCALA_VERSION}.tgz"
+  CURL_OUTPUT="${TMPDIR}/${FLINK_DIST_FILE_NAME}"
+
+  echo "Downloading ${FLINK_DIST_FILE_NAME} from ${FLINK_BASE_URL}"
+  curl -s ${FLINK_BASE_URL}${FLINK_DIST_FILE_NAME} --output ${CURL_OUTPUT}
--- End diff --

I think it's a bit surprising that for 1.5.0 the command will fail:
`build.sh --job-jar ./job.jar --from-release --flink-version 1.5.0 
--hadoop-version 28 --scala-version 2.11`
`build.sh --job-jar ./job.jar --from-release --flink-version 1.5.1 
--hadoop-version 28 --scala-version 2.11` 


---


[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6320#discussion_r202336290
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
+
+## Installation
+
+Install the most recent stable version of docker
+https://docs.docker.com/installation/
+
+## Build
+
+Images are based on the official Java Alpine (OpenJDK 8) image. If you 
want to
+build the flink image run:
+
+sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job
--- End diff --

nit: The shebang in `build.sh` already tells the program loader to use 
`sh`. 


---


[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6320#discussion_r202332741
  
--- Diff: flink-container/pom.xml ---
@@ -0,0 +1,67 @@
+

[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6320#discussion_r202332323
  
--- Diff: 
flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link StandaloneJobClusterConfigurationParserFactory}.
+ */
+public class StandaloneJobClusterConfigurationParserFactoryTest extends 
TestLogger {
+
+   @Test
+   public void testEntrypointClusterConfigurationParsing() throws 
FlinkParseException {
+   final CommandLineParser 
commandLineParser = new CommandLineParser<>(new 
StandaloneJobClusterConfigurationParserFactory());
+
+   final String configDir = "/foo/bar";
+   final String key = "key";
+   final String value = "value";
+   final int restPort = 1234;
+   final String jobClassName = "foobar";
+   final String arg1 = "arg1";
+   final String arg2 = "arg2";
+   final String[] args = {"--configDir", configDir, 
"--webui-port", String.valueOf(restPort), "--job-classname", jobClassName, 
String.format("-D%s=%s", key, value), arg1, arg2};
+
+   final StandaloneJobClusterConfiguration clusterConfiguration = 
commandLineParser.parse(args);
+
+   assertThat(clusterConfiguration.getConfigDir(), 
is(equalTo(configDir)));
+   assertThat(clusterConfiguration.getJobClassName(), 
is(equalTo(jobClassName)));
+   assertThat(clusterConfiguration.getRestPort(), 
is(equalTo(restPort)));
+   final Properties dynamicProperties = 
clusterConfiguration.getDynamicProperties();
+
+   assertThat(dynamicProperties, hasEntry(key, value));
+
+   assertThat(clusterConfiguration.getArgs(), 
arrayContaining(arg1, arg2));
+   }
+
+   @Test
+   public void testOnlyRequiredArguments() throws FlinkParseException {
+   final CommandLineParser 
commandLineParser = new CommandLineParser<>(new 
StandaloneJobClusterConfigurationParserFactory());
+
+   final String configDir = "/foo/bar";
+   final String jobClassName = "foobar";
+   final String[] args = {"--configDir", configDir, 
"--job-classname", jobClassName};
+
+   final StandaloneJobClusterConfiguration clusterConfiguration = 
commandLineParser.parse(args);
+
+   assertThat(clusterConfiguration.getConfigDir(), 
is(equalTo(configDir)));
+   assertThat(clusterConfiguration.getJobClassName(), 
is(equalTo(jobClassName)));
+   assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1)));
+   }
+
+   @Test(expected = FlinkParseException.class)
+   public void testMissingRequiredArgument() throws FlinkParseException {
+   final CommandLineParser 
commandLineParser = new CommandLineParser<>(new 
StandaloneJobClusterConfigurationParserFactory());
+   final String[] args = {};
+
+   commandLineParser.parse(args);
+   fail("Expected an FlinkParseException.");
--- End diff --

Superfluous `fail`


---


[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6320#discussion_r202299810
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link EntrypointClusterConfigurationParserFactory}.
+ */
+public class EntrypointClusterConfigurationParserFactoryTest extends 
TestLogger {
+
+   @Test
+   public void testEntrypointClusterConfigurationParsing() throws 
FlinkParseException {
+   final CommandLineParser 
commandLineParser = new CommandLineParser<>(new 
EntrypointClusterConfigurationParserFactory());
+
+   final String configDir = "/foo/bar";
+   final int restPort = 1234;
+   final String key = "key";
+   final String value = "value";
+   final String arg1 = "arg1";
+   final String arg2 = "arg2";
+   final String[] args = {"--configDir", configDir, "-r", 
String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
+
+   final EntrypointClusterConfiguration clusterConfiguration = 
commandLineParser.parse(args);
+
+   assertThat(clusterConfiguration.getConfigDir(), 
is(equalTo(configDir)));
+   assertThat(clusterConfiguration.getRestPort(), 
is(equalTo(restPort)));
+   final Properties dynamicProperties = 
clusterConfiguration.getDynamicProperties();
+
+   assertThat(dynamicProperties, hasEntry(key, value));
+
+   assertThat(clusterConfiguration.getArgs(), 
arrayContaining(arg1, arg2));
+   }
+
+   @Test
+   public void testOnlyRequiredArguments() throws FlinkParseException {
+   final CommandLineParser 
commandLineParser = new CommandLineParser<>(new 
EntrypointClusterConfigurationParserFactory());
+
+   final String configDir = "/foo/bar";
+   final String[] args = {"--configDir", configDir};
+
+   final EntrypointClusterConfiguration clusterConfiguration = 
commandLineParser.parse(args);
+
+   assertThat(clusterConfiguration.getConfigDir(), 
is(equalTo(configDir)));
+   assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1)));
+   }
+
+   @Test(expected = FlinkParseException.class)
+   public void testMissingRequiredArgument() throws FlinkParseException {
+   final CommandLineParser 
commandLineParser = new CommandLineParser<>(new 
EntrypointClusterConfigurationParserFactory());
+   final String[] args = {};
+
+   commandLineParser.parse(args);
+   fail("Expected an FlinkParseException.");
--- End diff --

With `fail` is superfluous here.

```
@Test(expected = FlinkParseException.class)
public void testMissingRequiredArgument() throws FlinkParseException {
try {
final CommandLineParser 
commandLineParser = new CommandLineParser<>(new 
ClusterConfigurationParserFactory());
final String[] args = {};

commandLineParser.parse(args);
} catch (Exception e) {}
}
  

[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6320#discussion_r202300463
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link EntrypointClusterConfigurationParserFactory}.
+ */
+public class EntrypointClusterConfigurationParserFactoryTest extends 
TestLogger {
+
+   @Test
+   public void testEntrypointClusterConfigurationParsing() throws 
FlinkParseException {
+   final CommandLineParser 
commandLineParser = new CommandLineParser<>(new 
EntrypointClusterConfigurationParserFactory());
--- End diff --

Could be moved to a `setUp` method.


---


[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6320#discussion_r202305883
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
 ---
@@ -18,27 +18,43 @@
 
 package org.apache.flink.runtime.entrypoint;
 
-import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
+
+import java.util.Properties;
 
 /**
  * Configuration class which contains the parsed command line arguments for
  * the {@link ClusterEntrypoint}.
  */
 public class ClusterConfiguration {
+
+   @Nonnull
private final String configDir;
 
-   private final int restPort;
+   @Nonnull
+   private final Properties dynamicProperties;
+
+   @Nonnull
+   private final String[] args;
 
-   public ClusterConfiguration(String configDir, int restPort) {
-   this.configDir = Preconditions.checkNotNull(configDir);
-   this.restPort = restPort;
+   public ClusterConfiguration(@Nonnull String configDir, @Nonnull 
Properties dynamicProperties, @Nonnull String[] args) {
--- End diff --

`@Nonnull` is not used consistently, e.g., `FlinkParseException` is not 
annotated. I think it's easier to assume everything is non-null by default, and 
the rest should be annotated with `@Nullable`.


---


[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6320#discussion_r202300255
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link ClusterConfigurationParserFactory}.
+ */
+public class ClusterConfigurationParserFactoryTest extends TestLogger {
+
+   @Test
+   public void testEntrypointClusterConfigurationParsing() throws 
FlinkParseException {
+   final CommandLineParser commandLineParser 
= new CommandLineParser<>(new ClusterConfigurationParserFactory());
+
+   final String configDir = "/foo/bar";
+   final String key = "key";
+   final String value = "value";
+   final String arg1 = "arg1";
+   final String arg2 = "arg2";
+   final String[] args = {"--configDir", configDir, 
String.format("-D%s=%s", key, value), arg1, arg2};
+
+   final ClusterConfiguration clusterConfiguration = 
commandLineParser.parse(args);
+
+   assertThat(clusterConfiguration.getConfigDir(), 
is(equalTo(configDir)));
+   final Properties dynamicProperties = 
clusterConfiguration.getDynamicProperties();
+
+   assertThat(dynamicProperties, hasEntry(key, value));
+
+   assertThat(clusterConfiguration.getArgs(), 
arrayContaining(arg1, arg2));
+   }
+
+   @Test
+   public void testOnlyRequiredArguments() throws FlinkParseException {
+   final CommandLineParser commandLineParser 
= new CommandLineParser<>(new ClusterConfigurationParserFactory());
+
+   final String configDir = "/foo/bar";
+   final String[] args = {"--configDir", configDir};
+
+   final ClusterConfiguration clusterConfiguration = 
commandLineParser.parse(args);
+
+   assertThat(clusterConfiguration.getConfigDir(), 
is(equalTo(configDir)));
+   }
+
+   @Test(expected = FlinkParseException.class)
+   public void testMissingRequiredArgument() throws FlinkParseException {
+   final CommandLineParser commandLineParser 
= new CommandLineParser<>(new ClusterConfigurationParserFactory());
+   final String[] args = {};
+
+   commandLineParser.parse(args);
+   fail("Expected an FlinkParseException.");
--- End diff --

Superfluous `fail`


---


[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6320#discussion_r202300519
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link ClusterConfigurationParserFactory}.
+ */
+public class ClusterConfigurationParserFactoryTest extends TestLogger {
+
+   @Test
+   public void testEntrypointClusterConfigurationParsing() throws 
FlinkParseException {
+   final CommandLineParser commandLineParser 
= new CommandLineParser<>(new ClusterConfigurationParserFactory());
--- End diff --

Could be moved to a setup method.


---


[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...

2018-07-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202271606
  
--- Diff: flink-container/docker/build.sh ---
@@ -0,0 +1,118 @@
+#!/bin/sh
--- End diff --

Just want to note that in other scripts, `bash` is used. Also 
`/usr/bin/env` is used to determine the `bash` binary. See `config.sh` for an 
example. 


---


[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...

2018-07-13 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6240
  
I extended the regular `README.md`.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202057663
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

@dawidwys Why is it not enough?

> When obtaining a value from the configuration via 
Configuration.getValue(ConfigOption), the deprecated keys will be checked in 
the order provided to this method. The first key for which a value is found 
will be used - that value will be returned.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202020619
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

@dawidwys As I understand, backwards compatibility should be prioritized. 
See https://github.com/apache/flink/pull/5448#issuecomment-394296226

> We need to add an additional MemoryUnit.parse() method that takes the 
"default" unit, so that we parse the old heap sizes such that they are in MB if 
nothing else is specified.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202017985
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

Would it hurt to add `.withDeprecatedKeys("jobmanager.heap.mb")`? In case 
someone does not use the scripts to start the cluster.


---


[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

2018-07-12 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6297
  
cc: @dawidwys @StephanEwen 


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r201980407
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

Was there a reason against making `JOB_MANAGER_HEAP_MEMORY_MB` a deprecated 
key for `JOB_MANAGER_HEAP_MEMORY` when you worked on FLINK-6469? That is:
```
public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
key("jobmanager.heap.size")
.defaultValue("1024m")
.withDeprecatedKeys("jobmanager.heap.mb")
...
```
I am not sure what happens if someone only configures `jobmanager.heap.mb` 
in `flink-conf.yaml`. Will the value be respected for all deployment modes?

Same holds for the TaskManager config key.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r201981340
  
--- Diff: docs/ops/deployment/yarn_setup.md ---
@@ -101,12 +101,12 @@ Usage:
Optional
  -D Dynamic properties
  -d,--detached   Start detached
- -jm,--jobManagerMemory Memory for JobManager Container [in 
MB]
+ -jm,--jobManagerMemory Memory for JobManager Container [with 
unit, if not, use MB]
--- End diff --

nit: I would reword this to: 
> *Memory for JobManager container with optional unit (default: MB).*


---


[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

2018-07-12 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6297
  
I will take a look later.


---


[GitHub] flink pull request #6296: When submitting a flink job with yarn-cluster, fli...

2018-07-10 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6296#discussion_r201570938
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -1189,7 +1189,7 @@ private static Path setupSingleLocalResource(
@Override
public FileVisitResult 
visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
throws IOException {
-
+   String fileName = 
file.getFileName().toString();
--- End diff --

`fileName` is not used 


---


[GitHub] flink pull request #6279: [FLINK-9706] Properly wait for termination of JobM...

2018-07-10 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6279#discussion_r201329956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -536,7 +540,25 @@ private JobManagerRunner 
createJobManagerRunner(JobGraph jobGraph) throws Except
private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean 
cleanupHA) {
final CompletableFuture cleanupFuture = removeJob(jobId, 
cleanupHA);
 
-   registerOrphanedJobManagerTerminationFuture(cleanupFuture);
+   registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
+   }
+
+   private void registerJobManagerRunnerTerminationFuture(JobID jobId, 
CompletableFuture jobManagerRunnerTerminationFuture) {
+   
Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
+
+   jobManagerTerminationFutures.put(jobId, 
jobManagerRunnerTerminationFuture);
+
+   // clean up the pending termination future
+   jobManagerRunnerTerminationFuture.thenRunAsync(
+   () -> {
+   final CompletableFuture terminationFuture 
= jobManagerTerminationFutures.remove(jobId);
+
+   //noinspection ObjectEquality
+   if (terminationFuture != null && 
terminationFuture != jobManagerRunnerTerminationFuture) {
+   jobManagerTerminationFutures.put(jobId, 
terminationFuture);
--- End diff --

Here you handle a case where a terminationFuture for a job got replaced. 
Under what circumstances can this happen? Doesn't the `checkState`: 
`Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));` 
prevent this?


---


[GitHub] flink pull request #6240: [FLINK-9004][tests] Implement Jepsen tests to test...

2018-07-10 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6240#discussion_r201312394
  
--- Diff: flink-jepsen/src/jepsen/flink/db.clj ---
@@ -175,7 +175,7 @@
   (c/su
 (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir 
"/bin/hadoop classpath` "
 "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
-" " install-dir "/bin/yarn-session.sh -d 
-jm 2048 -tm 2048")))
+" " install-dir "/bin/yarn-session.sh -d 
-jm 2048m -tm 2048m")))
--- End diff --

See https://issues.apache.org/jira/browse/FLINK-9777


---


[GitHub] flink pull request #6240: [FLINK-9004][tests] Implement Jepsen tests to test...

2018-07-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6240#discussion_r200667325
  
--- Diff: jepsen-flink/.gitignore ---
@@ -0,0 +1,17 @@
+*.class
+*.iml
+*.jar
+*.retry
+.DS_Store
+.hg/
+.hgignore
+.idea/
+/.lein-*
+/.nrepl-port
+/checkouts
+/classes
+/target
+pom.xml
+pom.xml.asc
+store
+bin/DataStreamAllroundTestProgram.jar
--- End diff --

Good point. I fixed it.


---


[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...

2018-07-06 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6240
  
It seems that we have to stick with `0.1.8` for now. According to legal, 
there are no concerns so far: https://issues.apache.org/jira/browse/LEGAL-392


---


[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...

2018-07-04 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6240
  
`0.1.9.` was problematic. Currently testing `0.1.10`.


---


[GitHub] flink issue #6239: [FLINK-9004][tests] Implement Jepsen tests to test job av...

2018-07-03 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6239
  
@zentol No problem, I opened a new one.


---


[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...

2018-07-03 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6240
  
As @zentol mentioned, we might want to exclude this from 
create_source_release.sh.


---


[GitHub] flink pull request #6240: [FLINK-9004][tests] Implement Jepsen tests to test...

2018-07-03 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6240

[FLINK-9004][tests] Implement Jepsen tests to test job availability.

## What is the purpose of the change

*Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, 
network
partitions, etc. The Flink cluster under test is automatically deployed on 
YARN
(session & job mode) and Mesos.*

Previous PR got closed accidentally: 
https://github.com/apache/flink/pull/6239

## Brief change log

  - *Implement Jepsen tests.*


## Verifying this change

This change added tests and can be verified as follows:

  - *The changes themselves are tests.*
  - *Run Jepsen tests in docker containers.*
  - *Run unit tests with `lein test`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no** (at 
least not to Flink))
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** (but it will 
as soon as test failures appear) / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

cc: @tillrohrmann @cewood @zentol @aljoscha 




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9004

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6240


commit 063e4621a5982b55ee7f7b0935290bbc717a5a45
Author: gyao 
Date:   2018-03-05T21:23:33Z

[FLINK-9004][tests] Implement Jepsen tests to test job availability.

Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, 
network
partitions, etc. The Flink cluster under test is automatically deployed on 
YARN
(session & job mode) and Mesos.

Provide Dockerfiles for local test development.

commit 46f0ea7b14c9c59d6cc40903486978f4fd8354d3
Author: gyao 
Date:   2018-07-02T12:21:18Z

fixup! [FLINK-9004][tests] Implement Jepsen tests to test job availability.




---


[GitHub] flink pull request #6239: [FLINK-9004][tests] Implement Jepsen tests to test...

2018-07-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6239#discussion_r199690915
  
--- Diff: jepsen-flink/README.md ---
@@ -0,0 +1,60 @@
+# jepsen.flink
+
+A Clojure project based on the 
[Jepsen](https://github.com/jepsen-io/jepsen) framework to find bugs in the
+distributed coordination of Apache Flink®.
+
+## Test Coverage
+Jepsen is a framework built to test the behavior of distributed systems
+under faults. The tests in this particular project deploy Flink on either 
YARN or Mesos, submit a
+job, and examine the availability of the job after injecting faults.
+A job is said to be available if all the tasks of the job are running.
+The faults that can be currently introduced to the Flink cluster include:
+* Killing of TaskManager/JobManager processes
+* Stopping HDFS NameNode
+* Network partitions
+
+There are many more properties other than job availability that could be
+verified but are not yet covered by this test suite, e.g., end-to-end 
exactly-once processing
+semantics.
+
+## Usage
+See the [Jepsen 
documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment)
+for how to set up the environment to run tests. The `scripts/run-tests.sh` 
documents how to invoke
+tests. The Flink job used for testing is located under
+`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build 
the job first and copy
+the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` 
directory of this project's
+root.
+
+To simplify development, we have prepared Dockerfiles and a Docker Compose 
template
+so that you can run the tests locally in containers. To build the images
+and start the containers, simply run:
+
+$ cd docker
+$ ./up.sh
+
+After the containers started, open a new terminal window and run `docker 
exec -it jepsen-control bash`.
+This will allow you to run arbitrary commands on the control node.
+To start the tests, you can use the `run-tests.sh` script in the `docker` 
directory,
+which expects the number of test iterations, and a URI to a Flink 
distribution, e.g.,
+
+./docker/run-tests.sh 1 https://example.com/flink-dist.tgz
+
+The project's root is mounted as a volume to all containers under the path 
`/jepsen`.
+This means that changes to the test sources are immediately reflected in 
the control node container.
+Moreover, this allows you to test locally built Flink distributions by 
copying the tarball to the
+project's root and passing a URI with the `file://` scheme to the 
`run-tests.sh` script, e.g.,
+`file:///jepsen/flink-dist.tgz`.
+
+### Checking the output of tests
+
+Consult the `jepsen.log` file for the particular test run in the `store` 
folder. The final output of every test will be either
+
+Everything looks good! ヽ('ー`)ノ
+
+or
+
+Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
--- End diff --

The Chinese character is the mouth and and the eyebrows of the emoji: 
https://github.com/jepsen-io/jepsen/blob/dd197bbce0b92c1ab3423709ac6bb0b2ee853365/jepsen/src/jepsen/core.clj#L532
 


---


[GitHub] flink pull request #6239: [FLINK-9004][tests] Implement Jepsen tests to test...

2018-07-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6239#discussion_r199622404
  
--- Diff: jepsen-flink/README.md ---
@@ -0,0 +1,60 @@
+# jepsen.flink
+
+A Clojure project based on the 
[Jepsen](https://github.com/jepsen-io/jepsen) framework to find bugs in the
+distributed coordination of Apache Flink®.
+
+## Test Coverage
+Jepsen is a framework built to test the behavior of distributed systems
+under faults. The tests in this particular project deploy Flink on either 
YARN or Mesos, submit a
+job, and examine the availability of the job after injecting faults.
+A job is said to be available if all the tasks of the job are running.
+The faults that can be currently introduced to the Flink cluster include:
+* Killing of TaskManager/JobManager processes
+* Stopping HDFS NameNode
+* Network partitions
+
+There are many more properties other than job availability that could be
+verified but are not yet covered by this test suite, e.g., end-to-end 
exactly-once processing
+semantics.
+
+## Usage
+See the [Jepsen 
documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment)
+for how to set up the environment to run tests. The `scripts/run-tests.sh` 
documents how to invoke
+tests. The Flink job used for testing is located under
+`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build 
the job first and copy
+the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` 
directory of this project's
+root.
+
+To simplify development, we have prepared Dockerfiles and a Docker Compose 
template
+so that you can run the tests locally in containers. To build the images
+and start the containers, simply run:
+
+$ cd docker
+$ ./up.sh
+
+After the containers started, open a new terminal window and run `docker 
exec -it jepsen-control bash`.
+This will allow you to run arbitrary commands on the control node.
+To start the tests, you can use the `run-tests.sh` script in the `docker` 
directory,
+which expects the number of test iterations, and a URI to a Flink 
distribution, e.g.,
+
+./docker/run-tests.sh 1 https://example.com/flink-dist.tgz
+
+The project's root is mounted as a volume to all containers under the path 
`/jepsen`.
+This means that changes to the test sources are immediately reflected in 
the control node container.
+Moreover, this allows you to test locally built Flink distributions by 
copying the tarball to the
+project's root and passing a URI with the `file://` scheme to the 
`run-tests.sh` script, e.g.,
+`file:///jepsen/flink-dist.tgz`.
+
+### Checking the output of tests
+
+Consult the `jepsen.log` file for the particular test run in the `store` 
folder. The final output of every test will be either
+
+Everything looks good! ヽ('ー`)ノ
+
+or
+
+Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
--- End diff --

it's a "flipping table emoji"


---


[GitHub] flink pull request #6239: [FLINK-9004][tests] Implement Jepsen tests to test...

2018-07-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6239#discussion_r199507669
  
--- Diff: jepsen-flink/docker/nodes ---
@@ -0,0 +1,3 @@
+n1
--- End diff --

file must be excluded from RAT plugin


---


[GitHub] flink pull request #6239: [FLINK-9004][tests] Implement Jepsen tests to test...

2018-07-02 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6239

[FLINK-9004][tests] Implement Jepsen tests to test job availability.

## What is the purpose of the change

*Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, 
network
partitions, etc. The Flink cluster under test is automatically deployed on 
YARN
(session & job mode) and Mesos.

Provide Dockerfiles for local test development.*


## Brief change log

  - *Implement Jepsen tests.*


## Verifying this change

This change added tests and can be verified as follows:

  - *The changes themselves are tests.*
  - *Run Jepsen tests in docker containers.*
  - *Run unit tests with `lein test`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no** (at 
least not to Flink))
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** (but it will 
as soon as test failures appear) / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

cc: @tillrohrmann @cewood


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9004

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6239


commit 063e4621a5982b55ee7f7b0935290bbc717a5a45
Author: gyao 
Date:   2018-03-05T21:23:33Z

[FLINK-9004][tests] Implement Jepsen tests to test job availability.

Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, 
network
partitions, etc. The Flink cluster under test is automatically deployed on 
YARN
(session & job mode) and Mesos.

Provide Dockerfiles for local test development.




---


[GitHub] flink pull request #6213: [FLINK-9672] Fail fatally if job submission fails ...

2018-06-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6213#discussion_r198912529
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -322,9 +322,6 @@ private JobManagerRunner 
createJobManagerRunner(JobGraph jobGraph) throws Except
 
@Override
public CompletableFuture> listJobs(Time timeout) {
-   if (jobManagerRunners.isEmpty()) {
--- End diff --

nice


---


[GitHub] flink issue #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage of AMQP ...

2018-06-21 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5410
  
Taking a look once more.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r194109528
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +42,63 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   private final String queueName;
private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection;
protected transient Channel channel;
protected SerializationSchema schema;
-   private boolean logFailuresOnly = false;
+   protected boolean logFailuresOnly = false;
+
+   private final RMQSinkPublishOptions messageCompute;
+   private final ReturnListener returnListener;
 
/**
 * @param rmqConnectionConfig The RabbitMQ connection configuration 
{@link RMQConnectionConfig}.
 * @param queueName The queue to publish messages to.
 * @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+* @param messageCompute A {@link RMQSinkPublishOptions} for providing 
message's routing key and/or properties
  */
-   public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema schema) {
+   private RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema schema,
--- End diff --

ping


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r194109518
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -105,7 +153,18 @@ public void invoke(IN value) {
try {
byte[] msg = schema.serialize(value);
 
-   channel.basicPublish("", queueName, null, msg);
+   if (messageCompute == null) {
+   channel.basicPublish("", queueName, null, msg);
+   } else {
+   String rk = 
messageCompute.computeRoutingKey(value);
+   String exchange = 
messageCompute.computeExchange(value);
+   channel.basicPublish((exchange != null) ? 
exchange : "",
--- End diff --

ping


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r194109540
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +42,63 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   private final String queueName;
private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection;
protected transient Channel channel;
protected SerializationSchema schema;
-   private boolean logFailuresOnly = false;
+   protected boolean logFailuresOnly = false;
+
+   private final RMQSinkPublishOptions messageCompute;
+   private final ReturnListener returnListener;
 
/**
 * @param rmqConnectionConfig The RabbitMQ connection configuration 
{@link RMQConnectionConfig}.
 * @param queueName The queue to publish messages to.
 * @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+* @param messageCompute A {@link RMQSinkPublishOptions} for providing 
message's routing key and/or properties
  */
-   public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema schema) {
+   private RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema schema,
+   RMQSinkPublishOptions messageCompute, 
ReturnListener returnListener) {
--- End diff --

ping


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193965626
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -105,7 +155,24 @@ public void invoke(IN value) {
try {
byte[] msg = schema.serialize(value);
 
-   channel.basicPublish("", queueName, null, msg);
+   if (publishOptions == null) {
+   channel.basicPublish("", queueName, null, msg);
+   } else {
+   boolean mandatory = 
publishOptions.computeMandatory(value);
+   boolean immediate = 
publishOptions.computeImmediate(value);
+
+   if (returnListener == null && (mandatory || 
immediate)) {
+   throw new 
IllegalStateException("Setting mandatory and/or immediate flags to true 
requires a ReturnListener.");
+   } else {
+   String rk = 
publishOptions.computeRoutingKey(value);
+   String exchange = 
publishOptions.computeExchange(value);
+
+   channel.basicPublish((exchange != null) 
? exchange : "",
--- End diff --

What do you think about not allowing the return of `computeRoutingKey` and 
`computeExchange` to be `null`. `basicPublish` does not allow `null`s and will 
throw an exception. This way our interfaces will be consistent with the ones 
from RMQ.


---


[GitHub] flink issue #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage of AMQP ...

2018-06-08 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5410
  
Thanks for updating @pduveau. There still some minor comments on 
`RMQSinkPublishOptions` and `README` that are open for discussion.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193964437
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing 
key and/or the properties.
+ *
+ * @param  The type of the data used by the sink.
+ */
+@PublicEvolving
+public interface RMQSinkPublishOptions extends java.io.Serializable {
+
+   /**
+* Compute the message's routing key from the data.
+* @param a The data used by the sink
+* @return The routing key of the message
+*/
+   String computeRoutingKey(IN a);
+
+   /**
+* Compute the message's properties from the data.
+* @param a The data used by the sink
+* @return The message's properties (can be null)
+*/
+   BasicProperties computeProperties(IN a);
+
+   /**
+* Compute the exchange from the data.
+* @param a The data used by the sink
+* @return The exchange to publish the message to
+*/
+   String computeExchange(IN a);
+
+   /**
+* Compute the mandatory flag used in basic.publish method
+* See AMQP API help for values.
+* A ReturnListener is mandatory if this flag can be true (if not it is 
ignored and forced to false)
+* @param a The data used by the sink
+* @return The mandatory flag
+*/
+   boolean computeMandatory(IN a);
--- End diff --

ping


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193964455
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing 
key and/or the properties.
+ *
+ * @param  The type of the data used by the sink.
+ */
+@PublicEvolving
+public interface RMQSinkPublishOptions extends java.io.Serializable {
+
+   /**
+* Compute the message's routing key from the data.
+* @param a The data used by the sink
+* @return The routing key of the message
+*/
+   String computeRoutingKey(IN a);
+
+   /**
+* Compute the message's properties from the data.
+* @param a The data used by the sink
+* @return The message's properties (can be null)
+*/
+   BasicProperties computeProperties(IN a);
+
+   /**
+* Compute the exchange from the data.
+* @param a The data used by the sink
+* @return The exchange to publish the message to
+*/
+   String computeExchange(IN a);
+
+   /**
+* Compute the mandatory flag used in basic.publish method
+* See AMQP API help for values.
+* A ReturnListener is mandatory if this flag can be true (if not it is 
ignored and forced to false)
+* @param a The data used by the sink
+* @return The mandatory flag
+*/
+   boolean computeMandatory(IN a);
+
+   /**
+* Compute the immediate flag
+* See AMQP API help for values.
+* A ReturnListener is mandatory if this flag can be true (if not it is 
ignored and forced to false)
+* @param a The data used by the sink
+* @return The mandatory flag
+*/
+   boolean computeImmediate(IN a);
--- End diff --

ping


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193964429
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing 
key and/or the properties.
+ *
+ * @param  The type of the data used by the sink.
+ */
+@PublicEvolving
+public interface RMQSinkPublishOptions extends java.io.Serializable {
+
+   /**
+* Compute the message's routing key from the data.
+* @param a The data used by the sink
+* @return The routing key of the message
+*/
+   String computeRoutingKey(IN a);
+
+   /**
+* Compute the message's properties from the data.
+* @param a The data used by the sink
+* @return The message's properties (can be null)
+*/
+   BasicProperties computeProperties(IN a);
+
+   /**
+* Compute the exchange from the data.
+* @param a The data used by the sink
+* @return The exchange to publish the message to
+*/
+   String computeExchange(IN a);
+
+   /**
+* Compute the mandatory flag used in basic.publish method
--- End diff --

ping


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193964402
  
--- Diff: flink-connectors/flink-connector-rabbitmq/README.md ---
@@ -9,3 +9,7 @@ nor packages binaries from the "RabbitMQ AMQP Java Client".
 Users that create and publish derivative work based on Flink's
 RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java 
Client")
 must be aware that this may be subject to conditions declared in the 
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 
("GPL") and the Apache License version 2 ("ASL").
+
+# This version provides a mechanism to handle AMQ Messaging features
--- End diff --

ping


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193963772
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
--- End diff --

I see that you have already changed it to `createRMQSinkWithOptions`. I 
think this works.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193963587
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -105,7 +153,18 @@ public void invoke(IN value) {
try {
byte[] msg = schema.serialize(value);
 
-   channel.basicPublish("", queueName, null, msg);
+   if (messageCompute == null) {
+   channel.basicPublish("", queueName, null, msg);
+   } else {
+   String rk = 
messageCompute.computeRoutingKey(value);
+   String exchange = 
messageCompute.computeExchange(value);
+   channel.basicPublish((exchange != null) ? 
exchange : "",
+   (rk != null) ? rk : "",
+   (returnListener != null) && 
messageCompute.computeMandatory(value),
--- End diff --

I think failing fast is a reasonable behavior. Can you also adapt the 
javadocs?


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193126744
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
--- End diff --

How about `createRMQSinkWithPublishOptions()`? I am not a AMQP specialist, 
if `RMQSinkFeatured` has a special meaning, then leave it.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193101683
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
+   publishOptions = new DummyPublishOptions();
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, serializationSchema, publishOptions);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeaturedReturnHandler() throws 
Exception {
+   publishOptions = new DummyPublishOptions();
--- End diff --

`publishOptions` and `returnListener` can be local variables.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193127128
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -105,7 +153,18 @@ public void invoke(IN value) {
try {
byte[] msg = schema.serialize(value);
 
-   channel.basicPublish("", queueName, null, msg);
+   if (messageCompute == null) {
+   channel.basicPublish("", queueName, null, msg);
+   } else {
+   String rk = 
messageCompute.computeRoutingKey(value);
+   String exchange = 
messageCompute.computeExchange(value);
+   channel.basicPublish((exchange != null) ? 
exchange : "",
+   (rk != null) ? rk : "",
+   (returnListener != null) && 
messageCompute.computeMandatory(value),
--- End diff --

I would prefer that an`IllegalStateException` is thrown if the user code 
returns `true` without a `returnListener`. 


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193126347
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
+   publishOptions = new DummyPublishOptions();
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, serializationSchema, publishOptions);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeaturedReturnHandler() throws 
Exception {
--- End diff --

How about `createRMQSinkWithReturnHandler()`?


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193126169
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
+   publishOptions = new DummyPublishOptions();
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, serializationSchema, publishOptions);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeaturedReturnHandler() throws 
Exception {
+   publishOptions = new DummyPublishOptions();
+   returnListener = new DummyReturnHandler();
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, serializationSchema, publishOptions, 
returnListener);
--- End diff --

nit: Java 7 diamond operator should be preferred, i.e., `new RMQSink<>(...)`


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193125374
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing 
key and/or the properties.
+ *
+ * @param  The type of the data used by the sink.
+ */
+@PublicEvolving
+public interface RMQSinkPublishOptions extends java.io.Serializable {
+
+   /**
+* Compute the message's routing key from the data.
+* @param a The data used by the sink
+* @return The routing key of the message
+*/
+   String computeRoutingKey(IN a);
+
+   /**
+* Compute the message's properties from the data.
+* @param a The data used by the sink
+* @return The message's properties (can be null)
+*/
+   BasicProperties computeProperties(IN a);
+
+   /**
+* Compute the exchange from the data.
+* @param a The data used by the sink
+* @return The exchange to publish the message to
+*/
+   String computeExchange(IN a);
+
+   /**
+* Compute the mandatory flag used in basic.publish method
+* See AMQP API help for values.
+* A ReturnListener is mandatory if this flag can be true (if not it is 
ignored and forced to false)
+* @param a The data used by the sink
+* @return The mandatory flag
+*/
+   boolean computeMandatory(IN a);
--- End diff --

How about we add a default implementation that returns RabbitMQs default 
value:
```
default boolean computeMandatory(IN a) {
return false;
}


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193122068
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +42,63 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   private final String queueName;
private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection;
protected transient Channel channel;
protected SerializationSchema schema;
-   private boolean logFailuresOnly = false;
+   protected boolean logFailuresOnly = false;
+
+   private final RMQSinkPublishOptions messageCompute;
+   private final ReturnListener returnListener;
 
/**
 * @param rmqConnectionConfig The RabbitMQ connection configuration 
{@link RMQConnectionConfig}.
 * @param queueName The queue to publish messages to.
 * @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+* @param messageCompute A {@link RMQSinkPublishOptions} for providing 
message's routing key and/or properties
  */
-   public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema schema) {
+   private RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema schema,
--- End diff --

We should add some validation to assert  that `queueName` is `null` iff. 
`messageCompute` is set. We should also check that `RMQConnectionConfig` and 
`SerializationSchema` must not be null.
You may want to use `org.apache.flink.util.Preconditions` for that.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193119522
  
--- Diff: flink-connectors/flink-connector-rabbitmq/README.md ---
@@ -9,3 +9,7 @@ nor packages binaries from the "RabbitMQ AMQP Java Client".
 Users that create and publish derivative work based on Flink's
 RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java 
Client")
 must be aware that this may be subject to conditions declared in the 
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 
("GPL") and the Apache License version 2 ("ASL").
+
+# This version provides a mechanism to handle AMQ Messaging features
--- End diff --

I think the Javadocs of the constructors are in better shape  than the 
contents of the `README.md`. I think we can drop this paragraph for now. What 
do you think?


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193070753
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -66,12 +79,19 @@ public void before() throws Exception {
}
 
@Test
-   public void openCallDeclaresQueue() throws Exception {
+   public void openCallDeclaresQueueInStandardMode() throws Exception {
createRMQSink();
 
verify(channel).queueDeclare(QUEUE_NAME, false, false, false, 
null);
}
 
+   @Test
+   public void openCallDontDeclaresQueueInFeaturedMode() throws Exception {
+   doThrow(Exception.class).when(channel).queueDeclare(null, 
false, false, false, null);
--- End diff --

There is no assertion in this test. I would re-write it as:
```
@Test
public void openCallDontDeclaresQueueInFeaturedMode() throws Exception {
createRMQSinkFeatured();

verify(channel, never()).queueDeclare(null, false, false, 
false, null);
}
```


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193120662
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -105,7 +153,18 @@ public void invoke(IN value) {
try {
byte[] msg = schema.serialize(value);
 
-   channel.basicPublish("", queueName, null, msg);
+   if (messageCompute == null) {
+   channel.basicPublish("", queueName, null, msg);
+   } else {
+   String rk = 
messageCompute.computeRoutingKey(value);
+   String exchange = 
messageCompute.computeExchange(value);
+   channel.basicPublish((exchange != null) ? 
exchange : "",
--- End diff --

I think it is better to assume that `computeRoutingKey` and 
`computeExchange` must return a non-null value. It would be consistent with the 
`basicPublish` method which also rejects nulls.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193126219
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
--- End diff --

nit: Java 7 diamond operator should be preferred, i.e., `new RMQSink<>(...)`


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193125823
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing 
key and/or the properties.
+ *
+ * @param  The type of the data used by the sink.
+ */
+@PublicEvolving
+public interface RMQSinkPublishOptions extends java.io.Serializable {
+
+   /**
+* Compute the message's routing key from the data.
+* @param a The data used by the sink
+* @return The routing key of the message
+*/
+   String computeRoutingKey(IN a);
+
+   /**
+* Compute the message's properties from the data.
+* @param a The data used by the sink
+* @return The message's properties (can be null)
+*/
+   BasicProperties computeProperties(IN a);
+
+   /**
+* Compute the exchange from the data.
+* @param a The data used by the sink
+* @return The exchange to publish the message to
+*/
+   String computeExchange(IN a);
+
+   /**
+* Compute the mandatory flag used in basic.publish method
+* See AMQP API help for values.
+* A ReturnListener is mandatory if this flag can be true (if not it is 
ignored and forced to false)
+* @param a The data used by the sink
+* @return The mandatory flag
+*/
+   boolean computeMandatory(IN a);
+
+   /**
+* Compute the immediate flag
+* See AMQP API help for values.
+* A ReturnListener is mandatory if this flag can be true (if not it is 
ignored and forced to false)
+* @param a The data used by the sink
+* @return The mandatory flag
+*/
+   boolean computeImmediate(IN a);
--- End diff --

Sorry I missed that RMQ does not implement this flag. We can remove it or 
add a Java 8 default implementation that returns `false`.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193126189
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
+   publishOptions = new DummyPublishOptions();
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, serializationSchema, publishOptions);
--- End diff --

nit: Java 7 diamond operator should be preferred, i.e., `new RMQSink<>(...)`


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193122883
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +42,63 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   private final String queueName;
private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection;
protected transient Channel channel;
protected SerializationSchema schema;
-   private boolean logFailuresOnly = false;
+   protected boolean logFailuresOnly = false;
+
+   private final RMQSinkPublishOptions messageCompute;
+   private final ReturnListener returnListener;
 
/**
 * @param rmqConnectionConfig The RabbitMQ connection configuration 
{@link RMQConnectionConfig}.
 * @param queueName The queue to publish messages to.
 * @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+* @param messageCompute A {@link RMQSinkPublishOptions} for providing 
message's routing key and/or properties
  */
-   public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema schema) {
+   private RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema schema,
+   RMQSinkPublishOptions messageCompute, 
ReturnListener returnListener) {
--- End diff --

`ReturnListener` is not `Serializable`. To work around this, we can add an 
interface as so:

```
public interface SerializableReturnListener extends ReturnListener, 
Serializable {
}
```


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193123998
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing 
key and/or the properties.
+ *
+ * @param  The type of the data used by the sink.
+ */
+@PublicEvolving
+public interface RMQSinkPublishOptions extends java.io.Serializable {
+
+   /**
+* Compute the message's routing key from the data.
+* @param a The data used by the sink
+* @return The routing key of the message
+*/
+   String computeRoutingKey(IN a);
+
+   /**
+* Compute the message's properties from the data.
+* @param a The data used by the sink
+* @return The message's properties (can be null)
+*/
+   BasicProperties computeProperties(IN a);
+
+   /**
+* Compute the exchange from the data.
+* @param a The data used by the sink
+* @return The exchange to publish the message to
+*/
+   String computeExchange(IN a);
+
+   /**
+* Compute the mandatory flag used in basic.publish method
--- End diff --

I think it's better to use Javadoc links: 
`{@link Channel#basicPublish(String, String, boolean, boolean, 
BasicProperties, byte[])}`



---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193010305
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +42,63 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   private final String queueName;
--- End diff --

Let's not change the visibility to `private` for now as it could break 
backwards compatibility.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193004950
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -109,6 +109,7 @@ private RMQConnectionConfig(String host, Integer port, 
String virtualHost, Strin
* @param requestedChannelMax requested maximum channel number
* @param requestedFrameMax requested maximum frame size
* @param requestedHeartbeat requested heartbeat interval
+   * @param createQueue enable or diable queue create on setup
--- End diff --

This parameter does not exist anymore.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193010378
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +42,63 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   private final String queueName;
private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection;
protected transient Channel channel;
protected SerializationSchema schema;
-   private boolean logFailuresOnly = false;
+   protected boolean logFailuresOnly = false;
--- End diff --

Is it needed to change the visibility to `protected`?


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193006836
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -124,7 +159,82 @@ public void closeAllResources() throws Exception {
verify(connection).close();
}
 
+   @Test
+   public void invokeFeaturedPublishBytesToQueue() throws Exception {
+   RMQSink rmqSink = createRMQSinkFeatured();
+
+   rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+   verify(serializationSchema).serialize(MESSAGE_STR);
+   verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, 
false,
+   publishOptions.computeProperties(""), MESSAGE);
+   }
+
+   @Test
+   public void invokeFeaturedReturnHandlerPublishBytesToQueue() throws 
Exception {
+   RMQSink rmqSink = createRMQSinkFeaturedReturnHandler();
+
+   rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+   verify(serializationSchema).serialize(MESSAGE_STR);
+   verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, true,
+   publishOptions.computeProperties(""), MESSAGE);
+   }
+
+   @Test(expected = RuntimeException.class)
+   public void exceptionDuringFeaturedPublishingIsNotIgnored() throws 
Exception {
+   RMQSink rmqSink = createRMQSinkFeatured();
+
+   doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, 
ROUTING_KEY, false, false,
+   publishOptions.computeProperties(""), MESSAGE);
+   rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+   }
+
+   @Test
+   public void 
exceptionDuringFeaturedPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
+   RMQSink rmqSink = createRMQSinkFeatured();
+   rmqSink.setLogFailuresOnly(true);
+
+   doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, 
ROUTING_KEY, false, false,
+   publishOptions.computeProperties(""), MESSAGE);
+   rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+   }
+
+   private class DummyPublishOptions implements 
RMQSinkPublishOptions {
--- End diff --

nit: `serialVersionUID` is missing.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-05-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r190658100
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing 
key and/or the properties.
+ *
+ * @param  The type of the data used by the sink.
+ */
+public interface RMQSinkPublishOptions {
+
+   /**
+* Compute the message's routing key from the data.
+* @param a The data used by the sink
+* @return The routing key of the message
+*/
+   public String computeRoutingKey(IN a);
--- End diff --

The `public` keyword is redundant here because method declarations in an 
interface are always `public`. 


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-05-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r190888216
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing 
key and/or the properties.
+ *
+ * @param  The type of the data used by the sink.
+ */
+public interface RMQSinkPublishOptions {
--- End diff --

I think the interface should be annotated with `@PublicEvolving`.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-05-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r190650550
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing 
key and/or the properties.
+ *
+ * @param  The type of the data used by the sink.
+ */
+public interface RMQSinkPublishOptions {
--- End diff --

This must extend `java.io.Serializable` because `RMQSink` has a 
non-transient field of this type.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-05-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r190654087
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 ---
@@ -151,7 +151,9 @@ public void open(Configuration config) throws Exception 
{
if (channel == null) {
throw new RuntimeException("None of RabbitMQ 
channels are available");
}
-   setupQueue();
+   if (rmqConnectionConfig.hasToCreateQueueOnSetup()) {
--- End diff --

There was a PR that did something similar and got rejected. See: 
https://github.com/apache/flink/pull/4979 and 
https://issues.apache.org/jira/browse/FLINK-8018


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-05-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r190888669
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +40,36 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   protected String queueName;
private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection;
protected transient Channel channel;
protected SerializationSchema schema;
-   private boolean logFailuresOnly = false;
+   protected boolean logFailuresOnly = false;
+
+   protected RMQSinkPublishOptions messageCompute;
 
/**
 * @param rmqConnectionConfig The RabbitMQ connection configuration 
{@link RMQConnectionConfig}.
 * @param queueName The queue to publish messages to.
 * @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
  */
public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema schema) {
-   this.rmqConnectionConfig = rmqConnectionConfig;
+   this(rmqConnectionConfig, schema, null);
this.queueName = queueName;
+   }
+
+   /**
+* @param rmqConnectionConfig The RabbitMQ connection configuration 
{@link RMQConnectionConfig}.
+* @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+* @param messageCompute A {@link RMQSinkPublishOptions} for providing 
message's routing key and/or properties
+ */
+   public RMQSink(RMQConnectionConfig rmqConnectionConfig, 
SerializationSchema schema,
--- End diff --

I think the constructor should be annotated with `@PublicEvolving`, and 
`messageCompute` should come after `rmqConnectionConfig` to be consistent with 
the other constructor.


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-05-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r190887946
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing 
key and/or the properties.
+ *
+ * @param  The type of the data used by the sink.
+ */
+public interface RMQSinkPublishOptions {
--- End diff --

Shouldn't be the `mandatory` flag in 
`
basicPublish(String exchange, String routingKey, boolean mandatory, 
BasicProperties props, byte[] body) throws IOException` covered as well?


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-05-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r190648628
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +40,36 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   protected String queueName;
private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection;
protected transient Channel channel;
protected SerializationSchema schema;
-   private boolean logFailuresOnly = false;
+   protected boolean logFailuresOnly = false;
+
+   protected RMQSinkPublishOptions messageCompute;
--- End diff --

This field and `queueName` should be `final`. I think you need a third 
private constructor to achieve this:

```
private RMQSink(RMQConnectionConfig rmqConnectionConfig,
String queueName,
SerializationSchema schema,
RMQSinkPublishOptions 
messageCompute) {
this.rmqConnectionConfig = rmqConnectionConfig;
this.queueName = queueName;
this.schema = schema;
this.messageCompute = messageCompute;
}
```

Then you can write
```
public RMQSink(RMQConnectionConfig rmqConnectionConfig,
   SerializationSchema schema,
   RMQSinkPublishOptions messageCompute) {
this(rmqConnectionConfig, null, schema, messageCompute);
}
```


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-05-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r190647503
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -43,14 +49,27 @@
 public class RMQSinkTest {
 
private static final String QUEUE_NAME = "queue";
+   private static final String EXCHANGE = "exchange";
+   private static final String ROUTING_KEY = "application.component.error";
+   private static final String EXPIRATION = "1";
private static final String MESSAGE_STR = "msg";
private static final byte[] MESSAGE = new byte[1];
+   private static Map<String, Object> headers = new HashMap<String, 
Object>();
+   private static AMQP.BasicProperties props;
--- End diff --

`headers.put("Test", new String("My Value"));` can be simplified to 
`headers.put("Test", "My Value");`
The static field `headers` is mutable state, which should be avoided.
Why not just:
```
private static AMQP.BasicProperties props = new 
AMQP.BasicProperties.Builder()
.headers(Collections.singletonMap("Test", "My Value"))
.expiration(EXPIRATION)
.build();
```
Then the static initializer is not even needed.


---


[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6069#discussion_r190605670
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -314,7 +319,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, 
ClassLoader classLoader)
 
log.info("Requesting blob server port.");
CompletableFuture portFuture = 
sendRequest(
-   BlobServerPortHeaders.getInstance());
+   BlobServerPortHeaders.getInstance(),
+   EmptyMessageParameters.getInstance());
--- End diff --

This is not needed. There should be an overload of `sendRequest` that does 
it.


---


[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6069#discussion_r190605113
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -274,11 +275,17 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
final JobMessageParameters  params = new JobMessageParameters();
params.jobPathParameter.resolve(jobId);
 
-   CompletableFuture responseFuture = 
sendRequest(detailsHeaders, params);
+   CompletableFuture responseFuture = sendRequest(
+   detailsHeaders,
+   params);
 
return responseFuture.thenApply(JobDetailsInfo::getJobStatus);
}
 
+   private Predicate isConnectionProblemOrServiceUnavailable() {
--- End diff --

nit: can be `static` and defined closer to `isConnectionProblemException` 
and `isServiceUnavailable`


---


[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6067#discussion_r190603900
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -732,19 +744,10 @@ public void heartbeatFromResourceManager(ResourceID 
resourceID) {
allocationId, jobId, resourceManagerId);
 
try {
-   if (resourceManagerConnection == null) {
-   final String message = "TaskManager is not 
connected to a resource manager.";
+   if (!isConnectedToResourceManager(resourceManagerId)) {
+   final String message = 
String.format("TaskManager is not connected to the resource manager %s.", 
resourceManagerId);
log.debug(message);
-   throw new SlotAllocationException(message);
-   }
-
-   if 
(!Objects.equals(resourceManagerConnection.getTargetLeaderId(), 
resourceManagerId)) {
-   final String message = "The leader id " + 
resourceManagerId +
-   " does not match with the leader id of 
the connected resource manager " +
-   
resourceManagerConnection.getTargetLeaderId() + '.';
-
-   log.debug(message);
-   throw new SlotAllocationException(message);
+   throw new TaskManagerException(message);
--- End diff --

We already talked about it offline.


---


[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6067#discussion_r190603365
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 ---
@@ -1483,6 +1485,216 @@ public void 
testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
}
}
 
+   /**
+* Tests that we ignore slot requests if the TaskExecutor is not
+* registered at a ResourceManager.
+*/
+   @Test
+   public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
+   final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+   final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
+
+   final TaskExecutor taskExecutor = 
createTaskExecutor(taskManagerServices);
+
+   taskExecutor.start();
+
+   try {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+   final CompletableFuture 
registrationFuture = new CompletableFuture<>();
+   final CompletableFuture 
taskExecutorResourceIdFuture = new CompletableFuture<>();
+
+   
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
 -> {
+
taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
+return registrationFuture;
+});
+
+   
rpc.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
+
+   final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+   final ResourceID resourceId = 
taskExecutorResourceIdFuture.get();
+
+   final SlotID slotId = new SlotID(resourceId, 0);
+   final CompletableFuture 
slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new 
AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), 
timeout);
+
+   try {
+   slotRequestResponse.get();
+   fail("We should not be able to request slots 
before the TaskExecutor is registered at the ResourceManager.");
+   } catch (ExecutionException ee) {
+   
assertThat(ExceptionUtils.stripExecutionException(ee), 
instanceOf(TaskManagerException.class));
+   }
+   } finally {
+   RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+   }
+   }
+
+   /**
+* Tests that the TaskExecutor tries to reconnect to a ResourceManager 
from which it
+* was explicitly disconnected.
+*/
+   @Test
+   public void testReconnectionAttemptIfExplicitlyDisconnected() throws 
Exception {
+   final long heartbeatInterval = 1000L;
+   final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+   final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+   final TaskExecutor taskExecutor = new TaskExecutor(
+   rpc,
+   
TaskManagerConfiguration.fromConfiguration(configuration),
+   haServices,
+   new TaskManagerServicesBuilder()
+   .setTaskSlotTable(taskSlotTable)
+   .setTaskManagerLocation(taskManagerLocation)
+   .build(),
+   new HeartbeatServices(heartbeatInterval, 1000L),
+   
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+   dummyBlobCacheService,
+   testingFatalErrorHandler);
+
+   taskExecutor.start();
+
+   try {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   final ClusterInformation clusterInformation = new 
ClusterInformation("foobar", 1234);
+   final CompletableFuture 
registrationResponseFuture = CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.gener

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6067#discussion_r190603934
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -933,25 +956,35 @@ public void requestHeartbeat(ResourceID resourceID, 
SlotReport slotReport) {
 
blobCacheService.setBlobServerAddress(blobServerAddress);
 
+   establishedResourceManagerConnection = new 
EstablishedResourceManagerConnection(
+   resourceManagerGateway,
+   resourceManagerResourceId,
+   taskExecutorRegistrationId);
+
stopRegistrationTimeout();
}
 
private void closeResourceManagerConnection(Exception cause) {
-   if (resourceManagerConnection != null) {
-
-   if (resourceManagerConnection.isConnected()) {
-   if (log.isDebugEnabled()) {
-   log.debug("Close ResourceManager 
connection {}.",
-   
resourceManagerConnection.getResourceManagerId(), cause);
-   } else {
-   log.info("Close ResourceManager 
connection {}.",
-   
resourceManagerConnection.getResourceManagerId());
-   }
-   
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+   if (establishedResourceManagerConnection != null) {
+   final ResourceID resourceManagerResourceId = 
establishedResourceManagerConnection.getResourceManagerResourceId();
 
-   ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
-   
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+   if (log.isDebugEnabled()) {
+   log.debug("Close ResourceManager connection 
{}.",
--- End diff --

We already talked about it offline.


---


[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6067#discussion_r190583266
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -933,25 +956,35 @@ public void requestHeartbeat(ResourceID resourceID, 
SlotReport slotReport) {
 
blobCacheService.setBlobServerAddress(blobServerAddress);
 
+   establishedResourceManagerConnection = new 
EstablishedResourceManagerConnection(
+   resourceManagerGateway,
+   resourceManagerResourceId,
+   taskExecutorRegistrationId);
+
stopRegistrationTimeout();
}
 
private void closeResourceManagerConnection(Exception cause) {
-   if (resourceManagerConnection != null) {
-
-   if (resourceManagerConnection.isConnected()) {
-   if (log.isDebugEnabled()) {
-   log.debug("Close ResourceManager 
connection {}.",
-   
resourceManagerConnection.getResourceManagerId(), cause);
-   } else {
-   log.info("Close ResourceManager 
connection {}.",
-   
resourceManagerConnection.getResourceManagerId());
-   }
-   
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+   if (establishedResourceManagerConnection != null) {
+   final ResourceID resourceManagerResourceId = 
establishedResourceManagerConnection.getResourceManagerResourceId();
 
-   ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
-   
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+   if (log.isDebugEnabled()) {
+   log.debug("Close ResourceManager connection 
{}.",
--- End diff --

I was wondering whether `cause` can get logged twice:

1. 
```
log.debug("Close ResourceManager connection {}.",
resourceManagerResourceId, cause);
```
2.
``` 
log.debug("Terminating registration attempts towards ResourceManager {}.",

resourceManagerConnection.getTargetAddress(), cause);
```


---


[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6067#discussion_r190557380
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -732,19 +744,10 @@ public void heartbeatFromResourceManager(ResourceID 
resourceID) {
allocationId, jobId, resourceManagerId);
 
try {
-   if (resourceManagerConnection == null) {
-   final String message = "TaskManager is not 
connected to a resource manager.";
+   if (!isConnectedToResourceManager(resourceManagerId)) {
+   final String message = 
String.format("TaskManager is not connected to the resource manager %s.", 
resourceManagerId);
log.debug(message);
-   throw new SlotAllocationException(message);
-   }
-
-   if 
(!Objects.equals(resourceManagerConnection.getTargetLeaderId(), 
resourceManagerId)) {
-   final String message = "The leader id " + 
resourceManagerId +
-   " does not match with the leader id of 
the connected resource manager " +
-   
resourceManagerConnection.getTargetLeaderId() + '.';
-
-   log.debug(message);
-   throw new SlotAllocationException(message);
+   throw new TaskManagerException(message);
--- End diff --

Why not return an exceptional future here?


---


[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6067#discussion_r190602158
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 ---
@@ -1483,6 +1485,216 @@ public void 
testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
}
}
 
+   /**
+* Tests that we ignore slot requests if the TaskExecutor is not
+* registered at a ResourceManager.
+*/
+   @Test
+   public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
+   final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+   final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
+
+   final TaskExecutor taskExecutor = 
createTaskExecutor(taskManagerServices);
+
+   taskExecutor.start();
+
+   try {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+   final CompletableFuture 
registrationFuture = new CompletableFuture<>();
+   final CompletableFuture 
taskExecutorResourceIdFuture = new CompletableFuture<>();
+
+   
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
 -> {
+
taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
+return registrationFuture;
+});
+
+   
rpc.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
+
+   final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+   final ResourceID resourceId = 
taskExecutorResourceIdFuture.get();
+
+   final SlotID slotId = new SlotID(resourceId, 0);
+   final CompletableFuture 
slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new 
AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), 
timeout);
+
+   try {
+   slotRequestResponse.get();
+   fail("We should not be able to request slots 
before the TaskExecutor is registered at the ResourceManager.");
+   } catch (ExecutionException ee) {
+   
assertThat(ExceptionUtils.stripExecutionException(ee), 
instanceOf(TaskManagerException.class));
+   }
+   } finally {
+   RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+   }
+   }
+
+   /**
+* Tests that the TaskExecutor tries to reconnect to a ResourceManager 
from which it
+* was explicitly disconnected.
+*/
+   @Test
+   public void testReconnectionAttemptIfExplicitlyDisconnected() throws 
Exception {
+   final long heartbeatInterval = 1000L;
+   final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+   final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+   final TaskExecutor taskExecutor = new TaskExecutor(
+   rpc,
+   
TaskManagerConfiguration.fromConfiguration(configuration),
+   haServices,
+   new TaskManagerServicesBuilder()
+   .setTaskSlotTable(taskSlotTable)
+   .setTaskManagerLocation(taskManagerLocation)
+   .build(),
+   new HeartbeatServices(heartbeatInterval, 1000L),
+   
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+   dummyBlobCacheService,
+   testingFatalErrorHandler);
+
+   taskExecutor.start();
+
+   try {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   final ClusterInformation clusterInformation = new 
ClusterInformation("foobar", 1234);
+   final CompletableFuture 
registrationResponseFuture = CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.gener

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6067#discussion_r190602807
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 ---
@@ -1139,6 +1147,61 @@ public void testReportAllocatedSlot() throws 
Exception {
}
}
 
+   /**
+* Testst that the SlotManager retries allocating a slot if the 
TaskExecutor#requestSlot call
+* fails.
+*/
+   @Test
+   public void testSlotRequestFailure() throws Exception {
+   try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), new TestingResourceActions())) {
+
+   final SlotRequest slotRequest = new SlotRequest(new 
JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+   slotManager.registerSlotRequest(slotRequest);
+
+   final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, 
String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
+   final BlockingQueue<CompletableFuture> 
responseQueue = new ArrayBlockingQueue<>(1);
+
+   final TestingTaskExecutorGateway 
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+   
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> 
{
+
requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
--- End diff --

I think you are using spaces for indentation here.


---


[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6068#discussion_r190500268
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
@@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
 
+   /**
+* Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
+* job reached a terminal state.
+*/
+   @Test
+   public void testRunningJobsRegistryCleanup() throws Exception {
+   submitJob();
+
+   runningJobsRegistry.setJobRunning(jobId);
+   assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+   resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+   // wait for the clearing
+   runningJobsRegistry.getClearedFuture().get();
+
+   assertThat(runningJobsRegistry.contains(jobId), is(false));
+   }
+
+   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
+
+   private final JobID jobId;
+
+   private final CompletableFuture clearedFuture = new 
CompletableFuture<>();
+
+   private JobSchedulingStatus jobSchedulingStatus = 
JobSchedulingStatus.PENDING;
+
+   private boolean containsJob = false;
+
+   private TestingRunningJobsRegistry(JobID jobId) {
+   this.jobId = jobId;
+   }
+
+   public CompletableFuture getClearedFuture() {
+   return clearedFuture;
+   }
+
+   @Override
+   public void setJobRunning(JobID jobID) throws IOException {
+   checkJobId(jobID);
+   containsJob = true;
+   jobSchedulingStatus = JobSchedulingStatus.RUNNING;
+   }
+
+   private void checkJobId(JobID jobID) {
+   Preconditions.checkArgument(jobId.equals(jobID));
--- End diff --

Not the best variable names here: `jobId` vs. `jobID`


---


[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6068#discussion_r190548385
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
@@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
 
+   /**
+* Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
+* job reached a terminal state.
+*/
+   @Test
+   public void testRunningJobsRegistryCleanup() throws Exception {
+   submitJob();
+
+   runningJobsRegistry.setJobRunning(jobId);
+   assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+   resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+   // wait for the clearing
+   runningJobsRegistry.getClearedFuture().get();
+
+   assertThat(runningJobsRegistry.contains(jobId), is(false));
+   }
+
+   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
+
+   private final JobID jobId;
+
+   private final CompletableFuture clearedFuture = new 
CompletableFuture<>();
--- End diff --

For simplicity I would use a `CountDownLatch` here.


---


[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6068#discussion_r190548607
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
@@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
 
+   /**
+* Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
+* job reached a terminal state.
+*/
+   @Test
+   public void testRunningJobsRegistryCleanup() throws Exception {
+   submitJob();
+
+   runningJobsRegistry.setJobRunning(jobId);
+   assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+   resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+   // wait for the clearing
+   runningJobsRegistry.getClearedFuture().get();
+
+   assertThat(runningJobsRegistry.contains(jobId), is(false));
+   }
+
+   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
--- End diff --

Maybe `SingleRunningJobRegistry`.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189004536
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -1605,9 +1609,16 @@ public void notifyHeartbeatTimeout(final ResourceID 
resourceId) {
runAsync(() -> {
log.info("The heartbeat of ResourceManager with 
id {} timed out.", resourceId);
 
-   closeResourceManagerConnection(
-   new TimeoutException(
-   "The heartbeat of 
ResourceManager with id " + resourceId + " timed out."));
+   if (establishedResourceManagerConnection != 
null && 
establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId))
 {
+   final String resourceManagerAddress = 
establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
--- End diff --

Declaration and assignment can be moved closer to 
`createResourceManagerConnection`.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189012246
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception 
cause) {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
+
+   startRegistrationTimeout();
+   }
+
+   private void startRegistrationTimeout() {
+   final Time maxRegistrationDuration = 
taskManagerConfiguration.getMaxRegistrationDuration();
+
+   if (maxRegistrationDuration != null) {
+   final UUID newRegistrationTimeoutId = UUID.randomUUID();
+   currentRegistrationTimeoutId = newRegistrationTimeoutId;
+   scheduleRunAsync(() -> 
registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
+   }
+   }
+
+   private void stopRegistrationTimeout() {
+   currentRegistrationTimeoutId = null;
+   }
+
+   private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
+   if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) 
{
+   final Time maxRegistrationDuration = 
taskManagerConfiguration.getMaxRegistrationDuration();
+
+   onFatalError(
+   new RegistrationTimeoutException(
+   String.format("Could not register at 
the ResourceManager within the specified maximum " +
+   "registration duration %s. This 
indicates a problem with this instance. Terminating now.",
+   maxRegistrationDuration)));
+   } else {
+   log.debug("Ignoring outdated registration timeout.");
--- End diff --

I think this will be logged even if the registration succeeded.


---


  1   2   3   4   5   6   7   >