[GitHub] flink issue #5718: [FLINK-8073][kafka-tests] Disable timeout in tests

2018-03-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5718
  
I don't want to make a fuss over a single test, so for the purposes of this 
test it's fine to just remote the timeout.

In general however i think it would be pretty neat if we would add such a 
`Timeout` to the `TestLogger` class. I see the problem of debugging tests that 
have timeouts, but removing timeouts altogether is the wrong conclusion imo. 
Instead, we could implement the `Timeout` such that it doesn't fail the test if 
a certain profile/property is set.

With this we keep the benefits of test timeouts (CI service independence, 
less special behavior locally vs CI, fixed upper bound for test times which is 
particularly useful for new tests, "guarantee" that the test terminates) while 
still allowing debugging. In fact we may end up improving the debugging 
situation by consolidating how timeouts are implemented instead of each test 
rolling their own solution that you can't disable.

The travis watchdog _cannot_ be removed as it covers the entire maven 
process that from time to time locks up outside of tests. That said, the fact 
that we have to rely on the travis watchdog _to ensure that tests terminate_ is 
a bad sign. Not to mention that it already forced us to introduce workarounds 
to make tests "compatible", like the kafka tests and others that print stuff 
for the sole purposes of not triggering it.



---


[GitHub] flink issue #5690: [FLINK-8935][tests] Extend MiniClusterClient

2018-03-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5690
  
merging...


---


[GitHub] flink pull request #5722: [FLINK-8958][tests] Port TaskCancelAsyncProducerCo...

2018-03-19 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5722

[FLINK-8958][tests] Port TaskCancelAsyncProducerConsumerITCase to flip6

## What is the purpose of the change

This PR ports the `TaskCancelAsyncProducerConsumerITCase` to flip6. The 
existing test was renamed to `TaskCancelAsyncProducerConsumerITCase`, and a 
ported copy was added.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8958

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5722.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5722


commit 9df4a4ff1fadc12f37c94659ae13fc6ad1623d2d
Author: zentol <chesnay@...>
Date:   2018-03-19T14:16:18Z

[FLINK-8958][tests] Port TaskCancelAsyncProducerConsumerITCase to flip6




---


[GitHub] flink issue #5710: [FLINK-8948][runtime] Fix IllegalStateException when clos...

2018-03-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5710
  
merging.


---


[GitHub] flink pull request #5696: [hotfix][javadoc] fix doc of SlotProvider.allocate...

2018-03-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5696#discussion_r17544
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
 ---
@@ -46,9 +46,10 @@
/**
 * Allocating slot with specific requirement.
 *
+* @param slotRequestId identifying the slot request to cancel
--- End diff --

I will remove "to cancel" while merging


---


[GitHub] flink issue #5708: [FLINK-8984][network] Drop taskmanager.exactly-once.block...

2018-03-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5708
  
What would happen if 
`taskmanager.network.credit-based-flow-control.enabled` is enabled, but 
`taskmanager.exactly-once.blocking.data.enabled` is disabled?

Is this an invalid setting?


---


[GitHub] flink issue #5718: [FLINK-8073][kafka-tests] Disable timeout in tests

2018-03-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5718
  
This change appears to be focus mostly around Travis which i dislike a bit. 
I'd rather add a custom `Timeout` that prints the thread-dump so we still 
get nice behavior when running things locally/in the IDE.

```
@Rule
public final Timeout timeout = new MyTimeOut(10, TimeUnit.MILLISECONDS);

public static class MyTimeOut extends Timeout {

public MyTimeOut(long timeout, TimeUnit timeUnit) {
super(timeout, timeUnit);
}

@Override
public Statement apply(Statement base, Description description) 
{
Statement fail = super.apply(base, description);
return new Statement() {
@Override
public void evaluate() throws Throwable {

System.out.println(crunchifyGenerateThreadDump());
fail.evaluate();
}
};
}
}

public static String crunchifyGenerateThreadDump() {
final StringBuilder dump = new StringBuilder();
final ThreadMXBean threadMXBean = 
ManagementFactory.getThreadMXBean();
final ThreadInfo[] threadInfos = 
threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
for (ThreadInfo threadInfo : threadInfos) {
dump.append('"');
dump.append(threadInfo.getThreadName());
dump.append("\" ");
final Thread.State state = threadInfo.getThreadState();
dump.append("\n   java.lang.Thread.State: ");
dump.append(state);
final StackTraceElement[] stackTraceElements = 
threadInfo.getStackTrace();
for (final StackTraceElement stackTraceElement : 
stackTraceElements) {
dump.append("\nat ");
dump.append(stackTraceElement);
}
dump.append("\n\n");
}
return dump.toString();
}
```


---


[GitHub] flink pull request #5720: [FLINK-8957][tests] Port JMXJobManagerMetricTest t...

2018-03-19 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5720

[FLINK-8957][tests] Port JMXJobManagerMetricTest to flip6

Based on #5690.

## What is the purpose of the change

Ports the `JMXJobManagerMetricTest` to use `MiniClusterResource`.

## Verifying this change

Run `JMXJobManagerMetricTest` with `flip6` profile enabled/disabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8957

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5720.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5720


commit 42e97294d1919c7020c687f9c389f61871cc3c6a
Author: zentol <chesnay@...>
Date:   2018-03-19T13:17:34Z

[FLINK-8957][tests] Port JMXJobManagerMetricTest to flip6




---


[GitHub] flink pull request #5719: [FLINK-8959][tests] Port AccumulatorLiveITCase to ...

2018-03-19 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5719

[FLINK-8959][tests] Port AccumulatorLiveITCase to flip6 

Based on #5701.

## What is the purpose of the change

This PR ports the `AccumulatorLiveITCase` to flip6. The existing test was 
renamed to `LegacyAccumulatorLiveITCase`, and a ported copy was added.

The heartbeat interval is not configurable for legacy clusters and is 
hard-coded to 5 seconds. Porting this test (in a reliable fashion) without 
relying on internal/test APIs would've increased the test duration, so I 
decided to just leave it as it is.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8959

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5719.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5719


commit c676e2895f9adcaaddc30f12a7378ce5d6089fd6
Author: zentol <chesnay@...>
Date:   2018-03-14T13:21:27Z

[FLINK-8942][runtime] Pass heartbeat target ResourceID

commit 5858766794f81bd111a6380ef729a9c91b4b9ced
Author: zentol <chesnay@...>
Date:   2018-03-14T17:52:16Z

[FLINK-8881][runtime] Send accumulator updates via heartbeats

commit 040c8d053501e4aba609efc9757905cd6b553ecb
Author: zentol <chesnay@...>
Date:   2018-03-07T10:05:42Z

[FLINK-8935][tests] Implement MiniClusterClient#getAccumulators

commit 84b686146d5f7176e6c1dcec9bbdf51b9c601457
Author: zentol <chesnay@...>
Date:   2018-03-19T12:59:22Z

[FLINK-8959][tests] Port AccumulatorLiveITCase to flip6




---


[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6

2018-03-19 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5715

[FLINK-8956][tests] Port RescalingITCase to flip6 

Based on #5690.

## What is the purpose of the change

Ports the `RescalingITCase` to use `MiniClusterResource`.

## Verifying this change

Run `RescalingITCase` with `flip6` profile enabled/disabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8956

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5715.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5715






---


[GitHub] flink pull request #5714: [FLINK-8925][tests] Enable Flip6 on travis

2018-03-19 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5714

[FLINK-8925][tests] Enable Flip6 on travis 

## What is the purpose of the change

This PR enables Flip6 for half of our travis profiles.

## Brief change log

* add `Old` junit category 
* mark various tests with the new category 
* remove 2 existing flip6 profiles
* remove `OldAndFlip6` category (implicitly applies to all tests not 
annotated with `Old` or `Flip6`)
* invert flip6 profile; it now *disables* flip6 and is called `old`
* add activation property to `old` profile
* activate `old` profile in half of our travis profiles


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8924

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5714.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5714


commit 88923d3420dd8fc7a9c227cb45d020d8a38ab61d
Author: zentol <chesnay@...>
Date:   2018-03-12T12:04:50Z

[FLINK-8924][tests] Add "Old" junit category

commit 164b2ce94a1b54543810599a83b3e361358f4968
Author: zentol <chesnay@...>
Date:   2018-03-12T12:05:55Z

[FLINK-8925][tests] Enable Flip6 on travis




---


[GitHub] flink issue #5689: [FLINK-4569][tests] Respect exceptions thrown in thread i...

2018-03-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5689
  
merging,


---


[GitHub] flink pull request #5701: 8703 c savepoint

2018-03-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5701

8703 c savepoint

Based on #5690 and #5699.

## What is the purpose of the change

With this PR accumulator updates are sent via heartbeats from the 
TaskManager to JobManagers.
The SavepointMigrationTestBase was also ported to flip6, serving as 
preliminary verification until the other accumulator tests are ported.

## Verifying this change

*(Please pick either of the following options)*

This change added tests and can be verified as follows:

* run SavepointMigrationTestBase

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8703_c_savepoint

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5701.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5701


commit 14bcb66d858ee3e9488a51df7bfa1e36ec97f463
Author: zentol <chesnay@...>
Date:   2018-03-14T13:21:27Z

[FLINK-8942][runtime] Pass heartbeat target ResourceID

commit 761fd90f07335883429fc80fb4032b9ef28d32f5
Author: zentol <chesnay@...>
Date:   2018-03-14T17:52:16Z

[FLINK-8881][runtime] Send accumulator updates via heartbeats

commit 280874939c4d77e893da80e1e40acdcc869280bb
Author: zentol <chesnay@...>
Date:   2018-03-07T10:05:42Z

[FLINK-8935][tests] Implement MiniClusterClient#getAccumulators

commit d2fa754d9c5b68672ddc16233cdf390dabfd17c0
Author: zentol <chesnay@...>
Date:   2018-03-06T12:26:59Z

[FLINK-8935][tests] Implement MiniClusterClient#triggerSavepoint

commit ea04fd437d305a76c74edc6aa6c473a1fa917895
Author: zentol <chesnay@...>
Date:   2018-02-26T13:54:07Z

[FLINK-8703][tests] Port SavepointMigrationTestBase to MiniClusterResource




---


[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...

2018-03-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5699

[FLINK-8942][runtime] Pass heartbeat target ResourceID

## What is the purpose of the change

With this PR the heartbeat target `ResourceID` is passed to the 
`HeartbeatListener` when retrieving the payload so send. This allows the 
listener to create target-dependent payloads.

The primary use-case is FLINK-8881, where accumulators are sent via 
heartbeats to the JobManager. Here we only want to send accumulators for the 
relevant job, and not for all jobs.

## Brief change log

* add a `ResourceID` parameter to `HeartbeatListener#retrievePayload`
* modify return type of `HeartbeatManagerImpl#getHeartbeatTargets` to also 
contain the target `ResourceID`

## Verifying this change

This change added tests:

`HeartbeatManagerTest`:
* `testHeartbeatManagerTargetPayload`
* `testHeartbeatManagerSenderTargetPayload`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8942

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5699.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5699


commit d264dbbd260abc995ced8095a220260e8e0b3f1a
Author: zentol <chesnay@...>
Date:   2018-03-14T13:21:27Z

[FLINK-8942][runtime] Pass heartbeat target ResourceID




---


[GitHub] flink issue #5666: [FLINK-8703][tests] Port KafkaShortRetentionTestBase to M...

2018-03-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5666
  
merging



---


[GitHub] flink issue #5667: [FLINK-8703][tests] Port NotSoMiniClusterIterations to Mi...

2018-03-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5667
  
merging


---


[GitHub] flink issue #5668: [FLINK-8703][tests] Port StreamingScalabilityAndLatency t...

2018-03-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5668
  
merging


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174450032
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
 ---
@@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
}
 
@Override
-   public byte[] getSerializedValue(K key, N namespace) throws IOException 
{
-   Preconditions.checkState(namespace != null, "No namespace 
given.");
-   Preconditions.checkState(key != null, "No key given.");
+   public byte[] getSerializedValue(
+   byte[] serializedKeyAndNamespace,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   TypeSerializer<HashMap<UK, UV>> valueSerializer) throws 
Exception {
 
-   HashMap<UK, UV> result = stateTable.get(key, namespace);
+   Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
 
-   if (null == result) {
+   Tuple2<K, N> keyAndNamespace = 
KvStateSerializer.deserializeKeyAndNamespace(
+   serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
+
+   Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, 
keyAndNamespace.f1);
+
+   if (result == null) {
return null;
}
 
-   TypeSerializer userKeySerializer = 
stateDesc.getKeySerializer();
-   TypeSerializer userValueSerializer = 
stateDesc.getValueSerializer();
+   final HashMapSerializer<UK, UV> serializer = 
(HashMapSerializer<UK, UV>) valueSerializer;
--- End diff --

i mean why isn't the signature of `KvStateSerializer.serializeMap`:
```
KvStateSerializer.serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> 
serializer);
```

with the following implementation:
```
...
serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> serializer) {
if (map != null) {
DataOutputSerializer dos = new DataOutputSerializer(32);
serializer.serialize(map, dos);
return dos.getCopyOfBuffer();
} else {
return null;
}
```

Why deal with the map key/value entries at all outside the serializer?


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r17107
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * An entry holding the {@link InternalKvState} along with its {@link 
KvStateInfo}.
+ *
+ * @param   The type of key the state is associated to
+ * @param   The type of the namespace the state is associated to
+ * @param   The type of values kept internally in state
+ */
+@Internal
+public class KvStateEntry<K, N, V> {
+
+   private final InternalKvState<K, N, V> state;
+   private final KvStateInfo<K, N, V> stateInfo;
+
+   private final boolean isSerializerStateless;
+
+   private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> 
serializerCache;
+
+   public KvStateEntry(final InternalKvState<K, N, V> state) {
+   this.state = Preconditions.checkNotNull(state);
+   this.stateInfo = new KvStateInfo<>(
+   state.getKeySerializer(),
+   state.getNamespaceSerializer(),
+   state.getValueSerializer()
+   );
+   this.serializerCache = new ConcurrentHashMap<>();
+   this.isSerializerStateless = stateInfo.duplicate() == stateInfo;
+   }
+
+   public InternalKvState<K, N, V> getState() {
+   return state;
+   }
+
+   public KvStateInfo<K, N, V> getInfoForCurrentThread() {
+   return isSerializerStateless
+   ? stateInfo
+   : 
serializerCache.computeIfAbsent(Thread.currentThread(), t -> 
stateInfo.duplicate());
+   }
+
+   public void clear() {
+   if (serializerCache != null) {
+   serializerCache.clear();
+   }
+   }
+
+   @VisibleForTesting
+   public int getCacheSize() {
+   return serializerCache == null ? -1 : serializerCache.size();
--- End diff --

unnecessary null check


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r17245
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java ---
@@ -39,29 +39,27 @@
private final InternalKvState<K, N, V> state;
private final KvStateInfo<K, N, V> stateInfo;
 
+   private final boolean isSerializerStateless;
+
private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> 
serializerCache;
 
public KvStateEntry(final InternalKvState<K, N, V> state) {
-
this.state = Preconditions.checkNotNull(state);
this.stateInfo = new KvStateInfo<>(
state.getKeySerializer(),
state.getNamespaceSerializer(),
state.getValueSerializer()
);
-
-   this.serializerCache =
-   stateInfo.duplicate() == stateInfo
-   ? null  
// if the serializers are stateless, we do not need a 
cache
-   : new ConcurrentHashMap<>();
+   this.serializerCache = new ConcurrentHashMap<>();
+   this.isSerializerStateless = stateInfo.duplicate() == stateInfo;
--- End diff --

-> areSerializersStateless?


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r17070
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * An entry holding the {@link InternalKvState} along with its {@link 
KvStateInfo}.
+ *
+ * @param   The type of key the state is associated to
+ * @param   The type of the namespace the state is associated to
+ * @param   The type of values kept internally in state
+ */
+@Internal
+public class KvStateEntry<K, N, V> {
+
+   private final InternalKvState<K, N, V> state;
+   private final KvStateInfo<K, N, V> stateInfo;
+
+   private final boolean isSerializerStateless;
+
+   private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> 
serializerCache;
+
+   public KvStateEntry(final InternalKvState<K, N, V> state) {
+   this.state = Preconditions.checkNotNull(state);
+   this.stateInfo = new KvStateInfo<>(
+   state.getKeySerializer(),
+   state.getNamespaceSerializer(),
+   state.getValueSerializer()
+   );
+   this.serializerCache = new ConcurrentHashMap<>();
+   this.isSerializerStateless = stateInfo.duplicate() == stateInfo;
+   }
+
+   public InternalKvState<K, N, V> getState() {
+   return state;
+   }
+
+   public KvStateInfo<K, N, V> getInfoForCurrentThread() {
+   return isSerializerStateless
+   ? stateInfo
+   : 
serializerCache.computeIfAbsent(Thread.currentThread(), t -> 
stateInfo.duplicate());
+   }
+
+   public void clear() {
+   if (serializerCache != null) {
--- End diff --

unnecessary null check


---


[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

2018-03-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5697

[FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest

## What is the purpose of the change

This PR ports the `ScheduleOrUpdateConsumersTest` to flip6. The existing 
test was renamed to `LegacyScheduleOrUpdateConsumersTest`, and a ported copy 
was added.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8704_schedule

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5697.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5697


commit d7f42bf65177bb57f8159f7866397f4e55c5d9f0
Author: zentol <chesnay@...>
Date:   2018-03-12T12:21:01Z

[FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest




---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174381716
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 ---
@@ -686,7 +686,7 @@ public void testClientServerIntegration() throws 
Throwable {
state.update(201 + i);
 
// we know it must be a KvStat but this is not 
exposed to the user via State
--- End diff --

typo: KvStat


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174380197
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
 ---
@@ -20,13 +20,16 @@
 
 import org.apache.flink.api.common.state.MapState;
 
+import java.util.Map;
+
 /**
  * The peer to the {@link MapState} in the internal state type hierarchy.
  *
  * See {@link InternalKvState} for a description of the internal state 
hierarchy.
  *
- * @param  The type of the namespace
- * @param  Type of the values folded into the state
- * @param  Type of the value in the state
+ * @param   The type of key the state is associated to
+ * @param   The type of the namespace
+ * @param  Type of the values folded into the state
--- End diff --

the description seems like a copy from `FoldingState`


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174385360
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
 ---
@@ -40,6 +54,81 @@
  */
 public class KvStateRegistryTest extends TestLogger {
 
+   @Test
+   public void testKvStateEntry() throws InterruptedException {
+   final int threads = 10;
+
+   final CountDownLatch latch1 = new CountDownLatch(threads);
+   final CountDownLatch latch2 = new CountDownLatch(1);
+
+   final List<KvStateInfo> infos = 
Collections.synchronizedList(new ArrayList<>());
+
+   final JobID jobID = new JobID();
+
+   final JobVertexID jobVertexId = new JobVertexID();
+   final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+   final String registrationName = "foobar";
+
+   final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+   final KvStateID stateID = kvStateRegistry.registerKvState(
+   jobID,
+   jobVertexId,
+   keyGroupRange,
+   registrationName,
+   new DummyKvState()
+   );
+
+   for (int i = 0; i < threads; i++) {
+   new Thread(() -> {
+   final KvStateEntry kvState = 
kvStateRegistry.getKvState(stateID);
+   final KvStateInfo stateInfo = 
kvState.getInfoForCurrentThread();
+   infos.add(stateInfo);
+
+   latch1.countDown();
+   try {
+   latch2.await();
+   } catch (InterruptedException e) {
+   Assert.fail(e.getMessage());
--- End diff --

this would never directly fail the test and we may loose the exception


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174380673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
 ---
@@ -30,18 +30,17 @@
 import java.io.IOException;
 
 /**
- * Heap-backed partitioned {@link ReducingState} that is
- * snapshotted into files.
+ * Heap-backed partitioned {@link ReducingState} that is snapshotted into 
files.
  *
- * @param  The type of the key.
- * @param  The type of the namespace.
- * @param  The type of the value added to the state.
- * @param  The type of the value stored in the state (the accumulator 
type).
- * @param  The type of the value returned from the state.
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the value added to the state.
+ * @param The type of the value stored in the state (the 
accumulator type).
+ * @param The type of the value returned from the state.
--- End diff --

indentation


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174380400
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
 ---
@@ -24,9 +24,11 @@
  * The peer to the {@link AppendingState} in the internal state type 
hierarchy.
  * 
  * See {@link InternalKvState} for a description of the internal state 
hierarchy.
- * 
- * @paramThe type of the namespace
- * @param   The type of elements added to the state
- * @param  The type of the 
+ *
+ * @param   The type of key the state is associated to
+ * @param   The type of the namespace
+ * @param  The type of elements added to the state
+ * @param  The type of elements in the state
+ * @param The type of the resulting element in the state
--- End diff --

indentation is weird


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174384860
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
 ---
@@ -40,6 +54,81 @@
  */
 public class KvStateRegistryTest extends TestLogger {
 
+   @Test
+   public void testKvStateEntry() throws InterruptedException {
+   final int threads = 10;
+
+   final CountDownLatch latch1 = new CountDownLatch(threads);
+   final CountDownLatch latch2 = new CountDownLatch(1);
+
+   final List<KvStateInfo> infos = 
Collections.synchronizedList(new ArrayList<>());
+
+   final JobID jobID = new JobID();
+
+   final JobVertexID jobVertexId = new JobVertexID();
+   final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+   final String registrationName = "foobar";
+
+   final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+   final KvStateID stateID = kvStateRegistry.registerKvState(
+   jobID,
+   jobVertexId,
+   keyGroupRange,
+   registrationName,
+   new DummyKvState()
+   );
+
+   for (int i = 0; i < threads; i++) {
+   new Thread(() -> {
+   final KvStateEntry kvState = 
kvStateRegistry.getKvState(stateID);
+   final KvStateInfo stateInfo = 
kvState.getInfoForCurrentThread();
+   infos.add(stateInfo);
+
+   latch1.countDown();
+   try {
+   latch2.await();
+   } catch (InterruptedException e) {
+   Assert.fail(e.getMessage());
+   }
+
+   }).start();
+   }
+
+   latch1.await();
+
+   final KvStateEntry kvState = 
kvStateRegistry.getKvState(stateID);
+
+   // verify that all the threads are done correctly.
+   Assert.assertEquals(threads, infos.size());
+   Assert.assertEquals(threads, kvState.getCacheSize());
+
+   latch2.countDown();
+
+   for (KvStateInfo infoA: infos) {
+   boolean found = false;
+   for (KvStateInfo infoB: infos) {
+   if (infoA == infoB) {
+   if (found) {
+   Assert.fail("Already found");
+   }
+   found = true;
+   } else {
+   Assert.assertTrue(infoA != infoB && 
infoA.equals(infoB));
--- End diff --

`infoA != infoB` is redundant


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174381475
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java 
---
@@ -136,13 +138,13 @@ public void unregisterKvState(
}
 
/**
-* Returns the KvState instance identified by the given KvStateID or
-* null if none is registered.
+* Returns the KvState instance identified by the given KvStateID along 
with
--- End diff --

would rephrase to `KvStateEntry` containing the `KvState`


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174380623
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
 ---
@@ -29,20 +29,19 @@
 import java.io.IOException;
 
 /**
- * Heap-backed partitioned {@link FoldingState} that is
- * snapshotted into files.
+ * Heap-backed partitioned {@link FoldingState} that is snapshotted into 
files.
  *
- * @param  The type of the key.
- * @param  The type of the namespace.
- * @param  The type of the values that can be folded into the state.
- * @param  The type of the value in the folding state.
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param   The type of the values that can be folded into the 
state.
+ * @param The type of the value in the folding state.
--- End diff --

indentation is funky


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174383199
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Metadata about a {@link InternalKvState}. This includes the serializers 
for
+ * the key, the namespace, and the values kept in the state.
+ *
+ * @param   The type of key the state is associated to
+ * @param   The type of the namespace the state is associated to
+ * @param   The type of values kept internally in state
+ */
+public class KvStateInfo<K, N, V> {
+
+   private final TypeSerializer keySerializer;
+   private final TypeSerializer namespaceSerializer;
+   private final TypeSerializer stateValueSerializer;
+
+   public KvStateInfo(
+   final TypeSerializer keySerializer,
+   final TypeSerializer namespaceSerializer,
+   final TypeSerializer stateValueSerializer
+   ) {
+   this.keySerializer = Preconditions.checkNotNull(keySerializer);
+   this.namespaceSerializer = 
Preconditions.checkNotNull(namespaceSerializer);
+   this.stateValueSerializer = 
Preconditions.checkNotNull(stateValueSerializer);
+   }
+
+   /**
+* @return The serializer for the key the state is associated to.
+*/
+   public TypeSerializer getKeySerializer() {
+   return keySerializer;
+   }
+
+   /**
+* @return The serializer for the namespace the state is associated to.
+*/
+   public TypeSerializer getNamespaceSerializer() {
+   return namespaceSerializer;
+   }
+
+   /**
+* @return The serializer for the values kept in the state.
+*/
+   public TypeSerializer getStateValueSerializer() {
+   return stateValueSerializer;
+   }
+
+   /**
+* Creates a deep copy of the current {@link KvStateInfo} by duplicating
+* all the included serializers.
+*
+* This method assumes correct implementation of the {@link 
TypeSerializer#duplicate()}
+* method of the included serializers.
+*/
+   public KvStateInfo<K, N, V> duplicate() {
+   final TypeSerializer dupKeySerializer = 
keySerializer.duplicate();
+   final TypeSerializer dupNamespaceSerializer = 
namespaceSerializer.duplicate();
+   final TypeSerializer dupSVSerializer = 
stateValueSerializer.duplicate();
+
+   if (
+   dupKeySerializer == keySerializer &&
--- End diff --

odd formatting


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174385834
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
 ---
@@ -40,6 +54,81 @@
  */
 public class KvStateRegistryTest extends TestLogger {
 
+   @Test
+   public void testKvStateEntry() throws InterruptedException {
+   final int threads = 10;
+
+   final CountDownLatch latch1 = new CountDownLatch(threads);
+   final CountDownLatch latch2 = new CountDownLatch(1);
+
+   final List<KvStateInfo> infos = 
Collections.synchronizedList(new ArrayList<>());
+
+   final JobID jobID = new JobID();
+
+   final JobVertexID jobVertexId = new JobVertexID();
+   final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+   final String registrationName = "foobar";
+
+   final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+   final KvStateID stateID = kvStateRegistry.registerKvState(
+   jobID,
+   jobVertexId,
+   keyGroupRange,
+   registrationName,
+   new DummyKvState()
+   );
+
+   for (int i = 0; i < threads; i++) {
+   new Thread(() -> {
+   final KvStateEntry kvState = 
kvStateRegistry.getKvState(stateID);
+   final KvStateInfo stateInfo = 
kvState.getInfoForCurrentThread();
+   infos.add(stateInfo);
+
+   latch1.countDown();
+   try {
+   latch2.await();
+   } catch (InterruptedException e) {
+   Assert.fail(e.getMessage());
+   }
+
+   }).start();
+   }
+
+   latch1.await();
+
+   final KvStateEntry kvState = 
kvStateRegistry.getKvState(stateID);
+
+   // verify that all the threads are done correctly.
+   Assert.assertEquals(threads, infos.size());
+   Assert.assertEquals(threads, kvState.getCacheSize());
+
+   latch2.countDown();
+
+   for (KvStateInfo infoA: infos) {
+   boolean found = false;
+   for (KvStateInfo infoB: infos) {
+   if (infoA == infoB) {
+   if (found) {
--- End diff --

`found` needs a better name or a comment what it means


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174379723
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
 ---
@@ -26,12 +26,14 @@
  * The peer to the {@link MergingState} in the internal state type 
hierarchy.
  * 
  * See {@link InternalKvState} for a description of the internal state 
hierarchy.
- * 
- * @paramThe type of the namespace
- * @param   The type of elements added to the state
- * @param  The type of elements 
+ *
+ * @param   The type of key the state is associated to
+ * @param   The type of the namespace
+ * @param  The type of elements added to the state
+ * @param  The type of elements in the state
+ * @param The type of elements
--- End diff --

indentation is off and the description deserves an extension


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174380744
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
 ---
@@ -28,18 +28,19 @@
 import java.util.Collection;
 
 /**
- * Base class for {@link MergingState} ({@link 
org.apache.flink.runtime.state.internal.InternalMergingState})
- * that is stored on the heap.
+ * Base class for {@link MergingState} ({@link InternalMergingState}) that 
is stored on the heap.
  *
- * @param  The type of the key.
- * @param  The type of the namespace.
- * @param  The type of the values in the state.
- * @param  The type of State
- * @param  The type of StateDescriptor for the State S
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the input elements.
+ * @param  The type of the values in the state.
+ * @param The type of the output elements.
--- End diff --

indentation


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174385129
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
 ---
@@ -40,6 +54,81 @@
  */
 public class KvStateRegistryTest extends TestLogger {
 
+   @Test
+   public void testKvStateEntry() throws InterruptedException {
+   final int threads = 10;
+
+   final CountDownLatch latch1 = new CountDownLatch(threads);
+   final CountDownLatch latch2 = new CountDownLatch(1);
+
+   final List<KvStateInfo> infos = 
Collections.synchronizedList(new ArrayList<>());
+
+   final JobID jobID = new JobID();
+
+   final JobVertexID jobVertexId = new JobVertexID();
+   final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+   final String registrationName = "foobar";
+
+   final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+   final KvStateID stateID = kvStateRegistry.registerKvState(
+   jobID,
+   jobVertexId,
+   keyGroupRange,
+   registrationName,
+   new DummyKvState()
+   );
+
+   for (int i = 0; i < threads; i++) {
+   new Thread(() -> {
+   final KvStateEntry kvState = 
kvStateRegistry.getKvState(stateID);
+   final KvStateInfo stateInfo = 
kvState.getInfoForCurrentThread();
+   infos.add(stateInfo);
+
+   latch1.countDown();
+   try {
+   latch2.await();
+   } catch (InterruptedException e) {
+   Assert.fail(e.getMessage());
+   }
+
+   }).start();
+   }
+
+   latch1.await();
+
+   final KvStateEntry kvState = 
kvStateRegistry.getKvState(stateID);
+
+   // verify that all the threads are done correctly.
+   Assert.assertEquals(threads, infos.size());
+   Assert.assertEquals(threads, kvState.getCacheSize());
+
+   latch2.countDown();
+
+   for (KvStateInfo infoA: infos) {
+   boolean found = false;
+   for (KvStateInfo infoB: infos) {
+   if (infoA == infoB) {
+   if (found) {
+   Assert.fail("Already found");
+   }
+   found = true;
+   } else {
+   Assert.assertTrue(infoA != infoB && 
infoA.equals(infoB));
+   }
+   }
+   }
+
+   kvStateRegistry.unregisterKvState(
+   jobID,
+   jobVertexId,
+   keyGroupRange,
+   registrationName,
+   stateID);
+
+   // we have to call for garbage collection to be sure that 
everything is cleared up.
--- End diff --

?


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174383043
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * An entry holding the {@link InternalKvState} along with its {@link 
KvStateInfo}.
+ *
+ * @param   The type of key the state is associated to
+ * @param   The type of the namespace the state is associated to
+ * @param   The type of values kept internally in state
+ */
+@Internal
+public class KvStateEntry<K, N, V> {
+
+   private final InternalKvState<K, N, V> state;
+   private final KvStateInfo<K, N, V> stateInfo;
+
+   private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> 
serializerCache;
+
+   public KvStateEntry(final InternalKvState<K, N, V> state) {
+
+   this.state = Preconditions.checkNotNull(state);
+   this.stateInfo = new KvStateInfo<>(
+   state.getKeySerializer(),
+   state.getNamespaceSerializer(),
+   state.getValueSerializer()
+   );
+
+   this.serializerCache =
+   stateInfo.duplicate() == stateInfo
+   ? null  
// if the serializers are stateless, we do not need a 
cache
--- End diff --

An empty map would be acceptable here.


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174385024
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
 ---
@@ -40,6 +54,81 @@
  */
 public class KvStateRegistryTest extends TestLogger {
 
+   @Test
+   public void testKvStateEntry() throws InterruptedException {
+   final int threads = 10;
+
+   final CountDownLatch latch1 = new CountDownLatch(threads);
+   final CountDownLatch latch2 = new CountDownLatch(1);
+
+   final List<KvStateInfo> infos = 
Collections.synchronizedList(new ArrayList<>());
+
+   final JobID jobID = new JobID();
+
+   final JobVertexID jobVertexId = new JobVertexID();
+   final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+   final String registrationName = "foobar";
+
+   final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+   final KvStateID stateID = kvStateRegistry.registerKvState(
+   jobID,
+   jobVertexId,
+   keyGroupRange,
+   registrationName,
+   new DummyKvState()
+   );
+
+   for (int i = 0; i < threads; i++) {
+   new Thread(() -> {
+   final KvStateEntry kvState = 
kvStateRegistry.getKvState(stateID);
+   final KvStateInfo stateInfo = 
kvState.getInfoForCurrentThread();
+   infos.add(stateInfo);
+
+   latch1.countDown();
+   try {
+   latch2.await();
+   } catch (InterruptedException e) {
+   Assert.fail(e.getMessage());
+   }
+
+   }).start();
+   }
+
+   latch1.await();
+
+   final KvStateEntry kvState = 
kvStateRegistry.getKvState(stateID);
+
+   // verify that all the threads are done correctly.
+   Assert.assertEquals(threads, infos.size());
+   Assert.assertEquals(threads, kvState.getCacheSize());
+
+   latch2.countDown();
+
+   for (KvStateInfo infoA: infos) {
+   boolean found = false;
+   for (KvStateInfo infoB: infos) {
+   if (infoA == infoB) {
+   if (found) {
+   Assert.fail("Already found");
--- End diff --

needs a better error message


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174382685
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
--- End diff --

you can retain type safety:

Call from the handler:
```
byte[] serializedResult = getSerializedValue(kvState, 
serializedKeyAndNamespace);
```

Added method:
```
private static <K, N, V> byte[] getSerializedValue(KvStateEntry<K, N, V> 
entry, byte[] serializedKeyAndNamespace) throws Exception {
InternalKvState<K, N, V> state = entry.getState();
KvStateInfo<K, N, V> infoForCurrentThread = 
entry.getInfoForCurrentThread();

return state.getSerializedValue(
serializedKeyAndNamespace,
infoForCurrentThread.getKeySerializer(),
infoForCurrentThread.getNamespaceSerializer(),
infoForCurrentThread.getStateValueSerializer()
);
}
```


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174383696
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
 ---
@@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
}
 
@Override
-   public byte[] getSerializedValue(K key, N namespace) throws IOException 
{
-   Preconditions.checkState(namespace != null, "No namespace 
given.");
-   Preconditions.checkState(key != null, "No key given.");
+   public byte[] getSerializedValue(
+   byte[] serializedKeyAndNamespace,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   TypeSerializer<HashMap<UK, UV>> valueSerializer) throws 
Exception {
 
-   HashMap<UK, UV> result = stateTable.get(key, namespace);
+   Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
 
-   if (null == result) {
+   Tuple2<K, N> keyAndNamespace = 
KvStateSerializer.deserializeKeyAndNamespace(
+   serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
+
+   Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, 
keyAndNamespace.f1);
+
+   if (result == null) {
return null;
}
 
-   TypeSerializer userKeySerializer = 
stateDesc.getKeySerializer();
-   TypeSerializer userValueSerializer = 
stateDesc.getValueSerializer();
+   final HashMapSerializer<UK, UV> serializer = 
(HashMapSerializer<UK, UV>) valueSerializer;
--- End diff --

this shouldn't be necessary. why can't we just pass the map serialzer into 
`serializeMap`?


---


[GitHub] flink issue #5677: [hotfix] [streaming API] update doc of InternalTimerServi...

2018-03-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5677
  
merging.


---


[GitHub] flink issue #5692: [docs] Fix typo in Python API docs

2018-03-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5692
  
merging.


---


[GitHub] flink issue #5679: [FLINK-8916] Checkpointing Mode is always shown to be "At...

2018-03-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5679
  
merging.


---


[GitHub] flink issue #5670: [FLINK-8904][cli][tests] always restore the previous syso...

2018-03-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5670
  
merging.


---


[GitHub] flink pull request #5695: [FLINK-8704][tests] Port PartialConsumerPipelinedR...

2018-03-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5695

[FLINK-8704][tests] Port PartialConsumerPipelinedResultTest

## What is the purpose of the change

This PR ports the `PartialConsumerPipelinedResultTest` to flip6. The 
existing test was renamed to `LegacyPartialConsumerPipelinedResultTest`, and a 
ported copy was added.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8704_pipe

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5695.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5695


commit 4baf286842849652d2b33654f93ef3cb7870c2d2
Author: zentol <chesnay@...>
Date:   2018-03-12T12:09:44Z

[FLINK-8704][tests] Port PartialConsumerPipelinedResultTest




---


[GitHub] flink pull request #5694: [FLINK-8704][tests] Port SlotCountExceedingParalle...

2018-03-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5694

[FLINK-8704][tests] Port SlotCountExceedingParallelismTest

## What is the purpose of the change

This PR ports the `SlotCountExceedingParallelismTest` to flip6. The 
existing test was renamed to `LegacySlotCountExceedingParallelismTest`, and a 
ported copy was added.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8704_slot

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5694.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5694


commit bd6913fe5a430e9fa4b718c905ea16979eaab1d2
Author: zentol <chesnay@...>
Date:   2018-03-12T12:52:57Z

[FLINK-8704][tests] Port SlotCountExceedingParallelismTest




---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174148331
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

well we could move the thread-caching into the `InternalKvStates`.


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174144247
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

Ah, i mistakenly thought that `getSerializedValue()` is also called out of 
QS.

What would be the overhead of always duplicating serializers within 
`getSerializedValue()`?


---


[GitHub] flink pull request #5689: [FLINK-4569][tests] Respect exceptions thrown in t...

2018-03-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5689#discussion_r174140412
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
 ---
@@ -119,6 +121,11 @@ public void run() {
lock.release();
 
resumingThread.join();
+
+   Throwable exception = error.get();
+   if (exception != null) {
+   throw new AssertionError(exception);
+   }
--- End diff --

assertNull throws a NPE. This throws the actual exception with stacktrace 
etc and is the most equivalent to just throwing the exception.


---


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174128377
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

Couldn't we synchronize on `kvState` instead of modifying all 
`InternalKvState` implementations?

This seems like a much safer alternative than baking in the assumption that 
`getSerializedValue()` can be called concurrently.



---


[GitHub] flink pull request #5690: [FLINK-8935][tests] Extend MiniClusterClient

2018-03-13 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5690

[FLINK-8935][tests] Extend MiniClusterClient

## What is the purpose of the change

With this PR the `MiniClusterClient` implements `listJobs()`, 
`getAccumulators()`, `triggerSavepoint()` and `stop()`. 

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8935

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5690.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5690


commit 4382ae8aa781ec7bc469fb6ceed551434478b342
Author: zentol <chesnay@...>
Date:   2018-03-07T10:05:12Z

[FLINK-8935][tests] Implement MiniClusterClient#listJobs

commit f15bc13d2b749d300dfb3762ab8faf36e963aa4f
Author: zentol <chesnay@...>
Date:   2018-03-07T10:05:42Z

[FLINK-8935][tests] Implement MiniClusterClient#getAccumulators

commit 5ae94c92906784620fc4fd7e6dbcc4c5ff62683d
Author: zentol <chesnay@...>
Date:   2018-03-06T12:26:59Z

[FLINK-8935][tests] Implement MiniClusterClient#triggerSavepoint

commit a263e3f39899c3d6b0cfbf623609007ba03f0f0b
Author: zentol <chesnay@...>
Date:   2018-03-07T12:02:27Z

[FLINK-8935][tests] Implement MiniClusterClient#stop




---


[GitHub] flink pull request #5689: [FLINK-4569][tests] Respect exceptions thrown in t...

2018-03-13 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5689

[FLINK-4569][tests] Respect exceptions thrown in thread in 
JobRetrievalITCase

## What is the purpose of the change

This PR ensures that exceptions that occur in the submitting thread 
actually fail the test. Also contains minor fixes, like setting a thread name 
(for debugging purposes) and releasing the lock in the `SemaphoreInvokable` to 
allow multiple runs in the IDE.

## Verifying the change

One can modify the test to throw an exception after connecting 
successfully, which will fail the test.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 4569

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5689.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5689


commit 13c383d439550b1ebdc869981a7ffa19a6c8a151
Author: zentol <chesnay@...>
Date:   2018-03-13T12:00:47Z

[FLINK-4569][tests] Respect exceptions thrown in thread in 
JobRetrievalITCase




---


[GitHub] flink pull request #5687: [FLINK-8934] [flip6] Properly cancel slot requests...

2018-03-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5687#discussion_r174060373
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 ---
@@ -505,17 +513,21 @@ public void 
testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception
} catch (ExecutionException ee) {
// expected

assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof 
FlinkException);
-
}
 
-   final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+   assertEquals(allocationId1, 
canceledSlotRequests.take());
+
+   final SlotOffer slotOffer = new 
SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
 

slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
 

assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, 
slotOffer).get());
 
// the slot offer should fulfill the second slot request
-   assertEquals(allocationId, 
slotFuture2.get().getAllocationId());
+   assertEquals(allocationId1, 
slotFuture2.get().getAllocationId());
+
+   // check that the second slot request has been canceled
--- End diff --

Let's see if i understood the scenario here correctly:

We request the allocation of 2 slots. We cancel the first allocation 
request, but a TaskManager has already offered a slot to fulfill it.
We now have one pending allocation request and one offered slot, so we 
re-use the slot for the second request, which we can do since both requests 
were for the same job with the same resource requirements.

I think we can improve the wording a bit though, as this comment here says 
that the second request has been canceled, when just above it was fulfilled. I 
guess it should say that the slot _acquisition_ (i.e. the retrieval of slots 
from the RM) has been canceled. (also applies to the canceledSlotRequests 
variable)


---


[GitHub] flink pull request #5681: [FLINK-4569][tests] Fix JobRetrievalITCase

2018-03-12 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/5681


---


[GitHub] flink pull request #5681: [FLINK-4569][tests] Fix JobRetrievalITCase

2018-03-12 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5681

[FLINK-4569][tests] Fix JobRetrievalITCase

## What is the purpose of the change

This PR contains 2 changes to the `JobRetrievalITCase`.

1) We now store exceptions that occur in the submitting thread in a 
`AtomicReference` that we evaluate at the end of the test.

2) It could happen that the lock (preventing the job from terminating) was 
released before the client had a chance to finish the retrieval. To remedy this 
we now only release the lock if the thread has terminated, either by 
successfully connecting or failing with an error.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 4569

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5681.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5681


commit d45d8883365ef73c7064adfd098feea208d2ca4c
Author: zentol <chesnay@...>
Date:   2018-03-12T11:01:57Z

[FLINK-4569][tests] Fix JobRetrievalITCase




---


[GitHub] flink pull request #5665: [FLINK-8703][tests] Port WebFrontendITCase to Mini...

2018-03-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5665#discussion_r173743909
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 ---
@@ -66,40 +73,44 @@
private static final int NUM_TASK_MANAGERS = 2;
private static final int NUM_SLOTS = 4;
 
-   private static LocalFlinkMiniCluster cluster;
+   private static final Configuration CLUSTER_CONFIGURATION = 
getClusterConfiguration();
 
-   private static int port = -1;
+   @ClassRule
+   public static final MiniClusterResource CLUSTER = new 
MiniClusterResource(
+   new MiniClusterResource.MiniClusterResourceConfiguration(
+   CLUSTER_CONFIGURATION,
+   NUM_TASK_MANAGERS,
+   NUM_SLOTS),
+   true
+   );
 
-   @BeforeClass
-   public static void initialize() throws Exception {
+   private static Configuration getClusterConfiguration() {
Configuration config = new Configuration();
-   config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TASK_MANAGERS);
-   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS);
-   config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-   config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
-
-   File logDir = File.createTempFile("TestBaseUtils-logdir", null);
-   assertTrue("Unable to delete temp file", logDir.delete());
-   assertTrue("Unable to create temp directory", logDir.mkdir());
-   File logFile = new File(logDir, "jobmanager.log");
-   File outFile = new File(logDir, "jobmanager.out");
-
-   Files.createFile(logFile.toPath());
-   Files.createFile(outFile.toPath());
+   try {
+   File logDir = 
File.createTempFile("TestBaseUtils-logdir", null);
--- End diff --

Making it a rule is a bit tricky since you end up with a dependency between 
rules as the cluster is also one.


---


[GitHub] flink pull request #5679: [FLINK-8916] Checkpointing Mode is always shown to...

2018-03-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5679#discussion_r173742203
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
 ---
@@ -143,10 +152,66 @@ public int hashCode() {
}
 
/**
-* Processing mode.
+* JSON serializer for {@link ProcessingMode}.
 */
+   @JsonSerialize(using = ProcessingModeSerializer.class)
+   @JsonDeserialize(using = ProcessingModeDeserializer.class)
public enum ProcessingMode {
-   AT_LEAST_ONCE,
-   EXACTLY_ONCE
+   AT_LEAST_ONCE("at_least_once"),
+   EXACTLY_ONCE("exactly_once");
+
+   private String value;
+
+   ProcessingMode(String value) {
+   this.value = value;
+   }
+
+   public String getValue() {
+   return value;
+   }
+
+   public static ProcessingMode fromString(String value) {
--- End diff --

these changes to the enum seem unnecessary


---


[GitHub] flink pull request #5679: [FLINK-8916] Checkpointing Mode is always shown to...

2018-03-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5679#discussion_r173742094
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
 ---
@@ -143,10 +152,66 @@ public int hashCode() {
}
 
/**
-* Processing mode.
+* JSON serializer for {@link ProcessingMode}.
 */
+   @JsonSerialize(using = ProcessingModeSerializer.class)
+   @JsonDeserialize(using = ProcessingModeDeserializer.class)
public enum ProcessingMode {
-   AT_LEAST_ONCE,
-   EXACTLY_ONCE
+   AT_LEAST_ONCE("at_least_once"),
+   EXACTLY_ONCE("exactly_once");
+
+   private String value;
+
+   ProcessingMode(String value) {
+   this.value = value;
+   }
+
+   public String getValue() {
+   return value;
+   }
+
+   public static ProcessingMode fromString(String value) {
+   for (ProcessingMode mode : ProcessingMode.values()) {
+   if (mode.value.equalsIgnoreCase(value)) {
+   return mode;
+   }
+   }
+
+   throw new IllegalArgumentException("No constant with 
value " + value + " found");
+   }
+
+   }
+
+   /**
+* JSON deserializer for {@link ProcessingMode}.
+*/
+   public static class ProcessingModeSerializer extends 
StdSerializer {
+
+   public ProcessingModeSerializer() {
+   super(ProcessingMode.class);
+   }
+
+   @Override
+   public void serialize(ProcessingMode mode, JsonGenerator 
generator, SerializerProvider serializerProvider)
+   throws IOException {
+   generator.writeString(mode.getValue());
+   }
}
+
+   /**
+* Processing mode deserializer.
+*/
+   public static class ProcessingModeDeserializer extends 
StdDeserializer {
+
+   public ProcessingModeDeserializer() {
+   super(ProcessingMode.class);
+   }
+
+   @Override
+   public ProcessingMode deserialize(JsonParser jsonParser, 
DeserializationContext deserializationContext)
+   throws IOException {
+   return 
ProcessingMode.fromString(jsonParser.getValueAsString());
--- End diff --

replace with 
`ProcessingMode.valueOf(jsonParser.getValueAsString.toUpperCase())`


---


[GitHub] flink pull request #5679: [FLINK-8916] Checkpointing Mode is always shown to...

2018-03-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5679#discussion_r173741952
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
 ---
@@ -143,10 +152,66 @@ public int hashCode() {
}
 
/**
-* Processing mode.
+* JSON serializer for {@link ProcessingMode}.
 */
+   @JsonSerialize(using = ProcessingModeSerializer.class)
+   @JsonDeserialize(using = ProcessingModeDeserializer.class)
public enum ProcessingMode {
-   AT_LEAST_ONCE,
-   EXACTLY_ONCE
+   AT_LEAST_ONCE("at_least_once"),
+   EXACTLY_ONCE("exactly_once");
+
+   private String value;
+
+   ProcessingMode(String value) {
+   this.value = value;
+   }
+
+   public String getValue() {
+   return value;
+   }
+
+   public static ProcessingMode fromString(String value) {
+   for (ProcessingMode mode : ProcessingMode.values()) {
+   if (mode.value.equalsIgnoreCase(value)) {
+   return mode;
+   }
+   }
+
+   throw new IllegalArgumentException("No constant with 
value " + value + " found");
+   }
+
+   }
+
+   /**
+* JSON deserializer for {@link ProcessingMode}.
+*/
+   public static class ProcessingModeSerializer extends 
StdSerializer {
+
+   public ProcessingModeSerializer() {
+   super(ProcessingMode.class);
+   }
+
+   @Override
+   public void serialize(ProcessingMode mode, JsonGenerator 
generator, SerializerProvider serializerProvider)
+   throws IOException {
+   generator.writeString(mode.getValue());
--- End diff --

replace with `generator.writeString(mode.name().toLowerCase())`


---


[GitHub] flink issue #5673: [FLINK-8832] [sql-client] Create a SQL Client Kafka 0.11 ...

2018-03-11 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5673
  
How is this any different to a regular fat jar? I'm wondering what the 
purpose of the `sql-jar` suffix is.


---


[GitHub] flink pull request #5670: [FLINK-8904][cli][tests] always restore the previo...

2018-03-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5670#discussion_r173627882
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestUtils.java
 ---
@@ -65,9 +67,14 @@ public static String getInvalidConfigDir() {
}
 
public static void pipeSystemOutToNull() {
+   previousSysout = System.out;
--- End diff --

Doe it have to be done this way or could we make `previousSysout` final?


---


[GitHub] flink issue #5609: [FLINK-8822] RotateLogFile may not work well when sed ver...

2018-03-09 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5609
  
ehh...probably. The PR has a point as `sed --help` (v4.2.2) does not list 
the `-E` option.

If we want to be super safe we could check that  `sed -r` is supported and 
use `-E` as a backup.


---


[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5669#discussion_r173455497
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 ---
@@ -81,12 +81,6 @@ public void testCancelingEmptyTopic() throws Exception {
public void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
-
--- End diff --

ah right, forgot to comment that. I found these test to be rather..._odd_. 
They check behavior when not enough slots are available, but in the old code 
afaik this fails before the client even submits the job, and in flip6 this 
stalls as we never check whether enough slots are available (i guess with the 
underlying assumption that we would just allocate more TMs until we have 
enough).


---


[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-03-09 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5669

[FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource 

## What is the purpose of the change

Ports the `KafkaTestBase` and extending classes to use 
`MiniClusterResource`.

## Verifying this change

Run all tests extending `KafkaTestBase` with `flip6` profile 
enabled/disabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8703_kafkaB

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5669


commit 545b4dcccf79ce1b8bb530bae77ab4c2b6e85351
Author: zentol <chesnay@...>
Date:   2018-03-07T12:38:03Z

Remove Kafka testFailOnDeploy test

commit 3cdfd906cdefa85fe36b6717f41e831ca7e5ea72
Author: zentol <chesnay@...>
Date:   2018-03-07T12:39:25Z

[FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource




---


[GitHub] flink pull request #5668: [FLINK-8703][tests] Port StreamingScalabilityAndLa...

2018-03-09 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5668

[FLINK-8703][tests] Port StreamingScalabilityAndLatency to MiniCluste…

## What is the purpose of the change

Ports the `StreamingScalabilityAndLatency` to use `MiniClusterResource`.

## Verifying this change

Run `StreamingScalabilityAndLatency` with `flip6` profile enabled/disabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8703_stream

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5668.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5668


commit 08f56f6e954a37c9135bac891ad0bb6a31dcc8b0
Author: zentol <chesnay@...>
Date:   2018-02-27T14:21:50Z

[FLINK-8703][tests] Port StreamingScalabilityAndLatency to 
MiniClusterResource




---


[GitHub] flink pull request #5667: [FLINK-8703][tests] Port NotSoMiniClusterIteration...

2018-03-09 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5667

[FLINK-8703][tests] Port NotSoMiniClusterIterations to MiniClusterResource

## What is the purpose of the change

Ports the `NotSoMiniClusterIterations` to use `MiniClusterResource`.

## Verifying this change

Run `NotSoMiniClusterIterations` with `flip6` profile enabled/disabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8703_iter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5667.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5667


commit 858b2e4a31ad186b718c529a70757fda428a92ef
Author: zentol <chesnay@...>
Date:   2018-02-27T14:19:50Z

[FLINK-8703][tests] Port NotSoMiniClusterIterations to MiniClusterResource




---


[GitHub] flink pull request #5666: [FLINK-8703][tests] Port KafkaShortRetentionTestBa...

2018-03-09 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5666

[FLINK-8703][tests] Port KafkaShortRetentionTestBase to MiniClusterRe…

## What is the purpose of the change

Ports the `KafkaShortRetentionTestBase` to use `MiniClusterResource`.

## Verifying this change

Run `KafkaShortRetentionTestBase` with `flip6` profile enabled/disabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8703_kafke_short

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5666.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5666


commit 2aa3b9cdb39f812d40fb0be88265d1b36b631661
Author: zentol <chesnay@...>
Date:   2018-02-27T10:11:59Z

[FLINK-8703][tests] Port KafkaShortRetentionTestBase to MiniClusterResource




---


[GitHub] flink pull request #5665: [FLINK-8703][tests] Port WebFrontendITCase to Mini...

2018-03-09 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5665

 [FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource 

## What is the purpose of the change

Ports the `WebFrontendITCase` to use `MiniClusterResource`.

## Brief change log

* implement prerequisite `MiniClusterClient#listJobs()`
* modify `MiniClusterResource` to expose WebUI/REST port
* make `MiniClusterResource#CODEBASE_KEY/FLIP6_CODEBASE` public to support 
version dependent test behavior (not pretty but the alternative would be a full 
copy of the test)
* port WebFrontendITCase

## Verifying this change

Run `WebFrontendITCase` with `flip6` profile enabled/disabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8703_web

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5665.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5665


commit 41c5737545897506470f531394269b94ebe2ef12
Author: zentol <chesnay@...>
Date:   2018-03-07T10:05:12Z

[FLINK-8811][tests] Implement MiniClusterClient#listJobs

commit 98f366238cddcef38fff08b14764c9120dbcccea
Author: zentol <chesnay@...>
Date:   2018-03-07T10:14:20Z

[FLINK-8703][tests] Expose WebUI port

commit cb903f3681c808a3e5011211af6b7d174f86a23e
Author: zentol <chesnay@...>
Date:   2018-03-07T10:14:46Z

[FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource




---


[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...

2018-03-09 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5664

[FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource

## What is the purpose of the change

Ports the {{CancelingTestBase}} to use {{MiniClusterResource}}.

## Verifying this change

Run `MapCancelingTestBase` with `flip6` profile enabled/disabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8703_canceling

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5664.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5664


commit 57338df4819b2324f7ede2b131f81d83bc9096b2
Author: zentol <chesnay@...>
Date:   2018-02-26T14:36:37Z

[FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource

commit 22d4a2f02c256eb41a1684a5766a1dd53dc9351d
Author: zentol <chesnay@...>
Date:   2018-02-28T12:43:42Z

[hotfix][tests] Properly disable JoinCancelingITCase




---


[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-03-09 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5573
  
please do not trigger builds just to get that perfect green build. The test 
failure here is quite common at the moment.


---


[GitHub] flink issue #5657: [FLINK-8887][tests] Add single retry in MiniClusterClient

2018-03-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5657
  
merging.


---


[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-03-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5573
  
@yanghua This test failure is unrelated, you can ignore it.


---


[GitHub] flink issue #5554: [FLINK-8729][streaming] Refactor JSONGenerator to use jac...

2018-03-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5554
  
merging.


---


[GitHub] flink issue #5637: [FLINK-8860][flip6] stop SlotManager spamming logs for ev...

2018-03-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5637
  
merging.


---


[GitHub] flink pull request #5594: [FLINK-8800][REST] Reduce logging of all requests ...

2018-03-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5594#discussion_r173266836
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -84,8 +84,8 @@ protected AbstractHandler(
 
@Override
protected void respondAsLeader(ChannelHandlerContext ctx, Routed 
routed, T gateway) throws Exception {
-   if (log.isDebugEnabled()) {
-   log.debug("Received request " + 
routed.request().getUri() + '.');
+   if (log.isTraceEnabled()) {
--- End diff --

Yes we could simplify this, however we would be relying on implementation 
details of 2 libraries.


---


[GitHub] flink issue #5651: [FLINK-8889][tests] Do not override cluster config values

2018-03-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5651
  
merging.


---


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172922843
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,9 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   // we do not have concurrency checks here, because serialize() 
and
+   // deserialize() do the checks and the current mechanism does 
not handle
--- End diff --

looks like the end of the comment is missing


---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172893483
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_J

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172893235
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_J

[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172893030
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,19 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   if (CONCURRENT_ACCESS_CHECK) {
+   enterExclusiveThread();
+   }
+
+   try {
+   T value = deserialize(source);
--- End diff --

Have to point out that after `deserialize()` the checks in copy() are 
ineffective as the `currentThread` field has already been nulled. In other 
words, we guard against concurrent access before deserialize(), and within 
deserialize()(), but not between deserialize()() or after 
serialize().

This isn't a _problem_ as all code is actually covered, but we may want to 
document that.


---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172857716
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_J

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172856770
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_J

[GitHub] flink issue #5656: [FLINK-8487] Verify ZooKeeper checkpoint store behaviour ...

2018-03-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5656
  
should probably clean this branch from the restore/QS tests, and rebase it 
once.


---


[GitHub] flink pull request #5657: [FLINK-8887][tests] Add single retry in MiniCluste...

2018-03-07 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5657

[FLINK-8887][tests] Add single retry in MiniClusterClient

## What is the purpose of the change

This PR presents a test workaround for race-conditions in FLIP-6 (most 
notably FLINK-8887). Basically, every `MiniClusterClient` call is retried 
*once* after 500ms in case of certain exceptions.

**This is only a band-aid until a proper fix is in place** so we can 
finally continue merging more test ports.

## Brief change log

* add `guardWithSingleRetry` convenience method
* add `ScheduledExecutor` to `MiniClusterClient`
* guard all calls to the `MiniCluster`

## Verifying this change

The change can be verified by cherry-picking [this 
branch](https://github.com/zentol/flink/tree/8797) and running the 
`AbstractOperatorRestoreTestBase`. Before this change there was always 1-2 
tests failing, whereas now none should fail.

/cc @aljoscha @GJL 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8887_bandaid

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5657.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5657


commit f685fad6731b7c1774b247985a446260ea285663
Author: zentol <chesnay@...>
Date:   2018-03-07T14:02:05Z

[FLINK-8887][tests] Add single retry in MiniClusterClient




---


[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5648#discussion_r172835665
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -404,7 +406,29 @@ public void start() throws Exception {
final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 
if (jobManagerRunner != null) {
-   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
+   CompletableFuture statusFuture = 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
+   statusFuture.handle((JobStatus status, 
Throwable throwable) -> {
+   if (throwable != null) {
+   Throwable error = 
ExceptionUtils.stripCompletionException(throwable);
+
+   if (error instanceof 
FencingTokenException) {
--- End diff --

missing else block causes other exceptions to be swallowed...


---


[GitHub] flink pull request #5653: [FLINK-8890] Compare checkpoints with order in Com...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5653#discussion_r172812790
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 ---
@@ -290,16 +289,14 @@ public static boolean checkpointsMatch(
Collection first,
Collection second) {
--- End diff --

Ah, I would've expected that CompletedCheckpoint has an equals method, 
never mind then.


---


[GitHub] flink pull request #5653: [FLINK-8890] Compare checkpoints with order in Com...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5653#discussion_r172808666
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 ---
@@ -290,16 +289,14 @@ public static boolean checkpointsMatch(
Collection first,
Collection second) {
--- End diff --

change type to List? then we could skip the copy ado the comparison 
directly.


---


[GitHub] flink pull request #5653: [FLINK-8890] Compare checkpoints with order in Com...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5653#discussion_r172807476
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 ---
@@ -290,16 +289,14 @@ public static boolean checkpointsMatch(
Collection first,
Collection second) {
 
-   Set<Tuple2<Long, JobID>> firstInterestingFields =
-   new HashSet<>();
+   List<Tuple2<Long, JobID>> firstInterestingFields = new 
ArrayList<>();
--- End diff --

initialize with `first.size()`


---


[GitHub] flink pull request #5653: [FLINK-8890] Compare checkpoints with order in Com...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5653#discussion_r172809760
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 ---
@@ -290,16 +289,14 @@ public static boolean checkpointsMatch(
Collection first,
Collection second) {
 
-   Set<Tuple2<Long, JobID>> firstInterestingFields =
-   new HashSet<>();
+   List<Tuple2<Long, JobID>> firstInterestingFields = new 
ArrayList<>();
 
for (CompletedCheckpoint checkpoint : first) {
--- End diff --

For stability purposes we may want to check whether the input collections 
are actually sorted.. I know that the sole user 
`ZooKeeperCompletedCheckpointStore` does in fact pass sorted lists, but 
technically there's no guarantee at the moment.


---


[GitHub] flink pull request #5653: [FLINK-8890] Compare checkpoints with order in Com...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5653#discussion_r172807512
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 ---
@@ -290,16 +289,14 @@ public static boolean checkpointsMatch(
Collection first,
Collection second) {
 
-   Set<Tuple2<Long, JobID>> firstInterestingFields =
-   new HashSet<>();
+   List<Tuple2<Long, JobID>> firstInterestingFields = new 
ArrayList<>();
 
for (CompletedCheckpoint checkpoint : first) {
firstInterestingFields.add(
new Tuple2<>(checkpoint.getCheckpointID(), 
checkpoint.getJobId()));
}
 
-   Set<Tuple2<Long, JobID>> secondInterestingFields =
-   new HashSet<>();
+   List<Tuple2<Long, JobID>> secondInterestingFields = new 
ArrayList<>();
--- End diff --

initialize with `second.size()`


---


[GitHub] flink issue #5652: [hotfix][tests] Do not use singleActorSystem in LocalFlin...

2018-03-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5652
  
yup. But one profile is already scratching the 50m limit as is :/


---


[GitHub] flink issue #5652: [hotfix][tests] Do not use singleActorSystem in LocalFlin...

2018-03-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5652
  
The alternative would be to make the `ClusterClient` functionality optional 
and force tests to explicitly enable it.


---


[GitHub] flink issue #5652: [hotfix][tests] Do not use singleActorSystem in LocalFlin...

2018-03-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5652
  
All legacy tests going through the `MiniClusterResource` will take longer. 
I don't know by how much, but we now have to start multiple actor systems and 
the JM<->TM communication is no longer local.


---


[GitHub] flink pull request #5652: [hotfix][tests] Do not use singleActorSystem in Lo...

2018-03-07 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5652

[hotfix][tests] Do not use singleActorSystem in LocalFlinkMiniCluster

## What is the purpose of the change

The legacy cluster started in {{MiniClusterResource}} used a single actor 
system, which rendered the returned {{ClusterClient}} unusable.

This change will unfortunately cause tests to take longer, but i don't know 
how to fix this in another way.

On every access you would get this exception below:
```
org.apache.flink.client.program.ProgramInvocationException: Failed to 
retrieve the JobManager gateway.

at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:513)

at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:113)

Caused by: org.apache.flink.util.FlinkException: Could not find out our own 
hostname by connecting to the leading JobManager. Please make sure that the 
Flink cluster has been started.

at 
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:248)

at 
org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:923)

at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:511)

... 30 more

Caused by: 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not 
find the connecting address by connecting to the current leader.

at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.findConnectingAddress(LeaderRetrievalUtils.java:164)

at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.findConnectingAddress(LeaderRetrievalUtils.java:145)

at 
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:244)

... 32 more

Caused by: 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not 
retrieve the connecting address to the current leader with the akka URL 
akka://flink/user/jobmanager_1.

at 
org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener.findConnectingAddress(ConnectionUtils.java:472)

at 
org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener.findConnectingAddress(ConnectionUtils.java:361)

at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.findConnectingAddress(LeaderRetrievalUtils.java:162)

... 34 more

Caused by: java.lang.Exception: Could not retrieve InetSocketAddress from 
Akka URL akka://flink/user/jobmanager_1

at 
org.apache.flink.runtime.akka.AkkaUtils$.getInetSocketAddressFromAkkaURL(AkkaUtils.scala:709)

at 
org.apache.flink.runtime.akka.AkkaUtils.getInetSocketAddressFromAkkaURL(AkkaUtils.scala)

at 
org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener.findConnectingAddress(ConnectionUtils.java:392)

... 36 more
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink hotfix_single

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5652.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5652


commit 6a105cbb194b87dec98224b985ee5ceb9239d492
Author: zentol <chesnay@...>
Date:   2018-03-05T12:45:33Z

[hotfix][tests] Do not use singleActorSystem in LocalFlinkMiniCluster

Using a singleActorSystem rendered the returned client unusable.




---


[GitHub] flink pull request #5651: [FLINK-8889][tests] Do not override cluster config...

2018-03-07 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5651

[FLINK-8889][tests] Do not override cluster config values

## What is the purpose of the change

This PR modifies the cluster configuration in {{MiniClusterResource}} or 
{{TestBaseUtils}} to not categorically override several options. They are now 
only set if they weren't previously configured (i.e. by the test itself). 

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8889

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5651.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5651


commit dafff1946e20e6f3884398b705bbeb4f70462efa
Author: zentol <chesnay@...>
Date:   2018-03-07T09:18:03Z

[FLINK-8889][tests] Do not override cluster config values




---


[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5648#discussion_r172775779
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -404,16 +404,25 @@ public void start() throws Exception {
final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 
if (jobManagerRunner != null) {
-   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
-   } else {
-   final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
-
-   if (jobDetails != null) {
-   return 
CompletableFuture.completedFuture(jobDetails.getStatus());
-   } else {
-   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   try {
+   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
--- End diff --

This is an asynchronous call that isn't throwing the exception. You have to 
add a handler to the returned `CompletableFuture`. It also only properly 
resolves one of the exceptions, and IMO shouldn't catch `Exception` but the 
specific exceptions we want the workaround to work for as to not hide other 
issues.

In any case, I'm not sure if adding workarounds to the Dispatcher is the 
right way to go. These issues revealed that some scenarios are not properly 
handled, and I would prefer waiting for @tillrohrmann to really fix this in the 
Dispatcher and related components.

We can temporarily handle both exceptions in the `MiniClusterClient` by 
adding a *single* retry (with a short sleep) if a **specific** exception occurs.


---


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172756524
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map<String, Object> getAccumulators(final JobID jobID) throws 
Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.queryParameter.resolve(Collections.singletonList(true));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null && 
accumulatorsInfo.getSerializedUserAccumulators() != null) {
+   return 
accumulatorsInfo.getSerializedUserAccumulators();
--- End diff --

the accumulators should be deserialized via 
`SerializedValue#deserialize(ClassLoader)` .

If `Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader)` 
(that also should be overridden) was called use the passed in `ClassLoader`, 
otherwise `ClassLoader.getSystemClassLoader()`.


---


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172755830
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map<String, Object> getAccumulators(final JobID jobID) throws 
Exception {
--- End diff --

we should also override `Map<String, Object> getAccumulators(JobID jobID, 
ClassLoader loader)`


---


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172756640
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * request parameter for job accumulator's handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsMessageParameters extends JobMessageParameters 
{
+
+   public final AccumulatorsIncludeSerializedValueQueryParameter 
queryParameter = new AccumulatorsIncludeSerializedValueQueryParameter();
--- End diff --

field name is a bit generic; how about `includeSerializedAccumulators`?


---


[GitHub] flink issue #5637: [FLINK-8860][flip6] stop SlotManager spamming logs for ev...

2018-03-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5637
  
I would go for `TRACE` given how often we set the log level to `DEBUG`.


---


[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-03-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5573
  
We may want to delay merging this until 
[FLINK-8881](https://issues.apache.org/jira/browse/FLINK-8881) has been 
addressed, as it voids the primary use-case of this handler.


---


<    6   7   8   9   10   11   12   13   14   15   >