[GitHub] flink issue #5718: [FLINK-8073][kafka-tests] Disable timeout in tests
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5718 I don't want to make a fuss over a single test, so for the purposes of this test it's fine to just remote the timeout. In general however i think it would be pretty neat if we would add such a `Timeout` to the `TestLogger` class. I see the problem of debugging tests that have timeouts, but removing timeouts altogether is the wrong conclusion imo. Instead, we could implement the `Timeout` such that it doesn't fail the test if a certain profile/property is set. With this we keep the benefits of test timeouts (CI service independence, less special behavior locally vs CI, fixed upper bound for test times which is particularly useful for new tests, "guarantee" that the test terminates) while still allowing debugging. In fact we may end up improving the debugging situation by consolidating how timeouts are implemented instead of each test rolling their own solution that you can't disable. The travis watchdog _cannot_ be removed as it covers the entire maven process that from time to time locks up outside of tests. That said, the fact that we have to rely on the travis watchdog _to ensure that tests terminate_ is a bad sign. Not to mention that it already forced us to introduce workarounds to make tests "compatible", like the kafka tests and others that print stuff for the sole purposes of not triggering it. ---
[GitHub] flink issue #5690: [FLINK-8935][tests] Extend MiniClusterClient
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5690 merging... ---
[GitHub] flink pull request #5722: [FLINK-8958][tests] Port TaskCancelAsyncProducerCo...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5722 [FLINK-8958][tests] Port TaskCancelAsyncProducerConsumerITCase to flip6 ## What is the purpose of the change This PR ports the `TaskCancelAsyncProducerConsumerITCase` to flip6. The existing test was renamed to `TaskCancelAsyncProducerConsumerITCase`, and a ported copy was added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8958 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5722.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5722 commit 9df4a4ff1fadc12f37c94659ae13fc6ad1623d2d Author: zentol <chesnay@...> Date: 2018-03-19T14:16:18Z [FLINK-8958][tests] Port TaskCancelAsyncProducerConsumerITCase to flip6 ---
[GitHub] flink issue #5710: [FLINK-8948][runtime] Fix IllegalStateException when clos...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5710 merging. ---
[GitHub] flink pull request #5696: [hotfix][javadoc] fix doc of SlotProvider.allocate...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5696#discussion_r17544 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java --- @@ -46,9 +46,10 @@ /** * Allocating slot with specific requirement. * +* @param slotRequestId identifying the slot request to cancel --- End diff -- I will remove "to cancel" while merging ---
[GitHub] flink issue #5708: [FLINK-8984][network] Drop taskmanager.exactly-once.block...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5708 What would happen if `taskmanager.network.credit-based-flow-control.enabled` is enabled, but `taskmanager.exactly-once.blocking.data.enabled` is disabled? Is this an invalid setting? ---
[GitHub] flink issue #5718: [FLINK-8073][kafka-tests] Disable timeout in tests
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5718 This change appears to be focus mostly around Travis which i dislike a bit. I'd rather add a custom `Timeout` that prints the thread-dump so we still get nice behavior when running things locally/in the IDE. ``` @Rule public final Timeout timeout = new MyTimeOut(10, TimeUnit.MILLISECONDS); public static class MyTimeOut extends Timeout { public MyTimeOut(long timeout, TimeUnit timeUnit) { super(timeout, timeUnit); } @Override public Statement apply(Statement base, Description description) { Statement fail = super.apply(base, description); return new Statement() { @Override public void evaluate() throws Throwable { System.out.println(crunchifyGenerateThreadDump()); fail.evaluate(); } }; } } public static String crunchifyGenerateThreadDump() { final StringBuilder dump = new StringBuilder(); final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100); for (ThreadInfo threadInfo : threadInfos) { dump.append('"'); dump.append(threadInfo.getThreadName()); dump.append("\" "); final Thread.State state = threadInfo.getThreadState(); dump.append("\n java.lang.Thread.State: "); dump.append(state); final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace(); for (final StackTraceElement stackTraceElement : stackTraceElements) { dump.append("\nat "); dump.append(stackTraceElement); } dump.append("\n\n"); } return dump.toString(); } ``` ---
[GitHub] flink pull request #5720: [FLINK-8957][tests] Port JMXJobManagerMetricTest t...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5720 [FLINK-8957][tests] Port JMXJobManagerMetricTest to flip6 Based on #5690. ## What is the purpose of the change Ports the `JMXJobManagerMetricTest` to use `MiniClusterResource`. ## Verifying this change Run `JMXJobManagerMetricTest` with `flip6` profile enabled/disabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8957 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5720.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5720 commit 42e97294d1919c7020c687f9c389f61871cc3c6a Author: zentol <chesnay@...> Date: 2018-03-19T13:17:34Z [FLINK-8957][tests] Port JMXJobManagerMetricTest to flip6 ---
[GitHub] flink pull request #5719: [FLINK-8959][tests] Port AccumulatorLiveITCase to ...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5719 [FLINK-8959][tests] Port AccumulatorLiveITCase to flip6 Based on #5701. ## What is the purpose of the change This PR ports the `AccumulatorLiveITCase` to flip6. The existing test was renamed to `LegacyAccumulatorLiveITCase`, and a ported copy was added. The heartbeat interval is not configurable for legacy clusters and is hard-coded to 5 seconds. Porting this test (in a reliable fashion) without relying on internal/test APIs would've increased the test duration, so I decided to just leave it as it is. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8959 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5719.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5719 commit c676e2895f9adcaaddc30f12a7378ce5d6089fd6 Author: zentol <chesnay@...> Date: 2018-03-14T13:21:27Z [FLINK-8942][runtime] Pass heartbeat target ResourceID commit 5858766794f81bd111a6380ef729a9c91b4b9ced Author: zentol <chesnay@...> Date: 2018-03-14T17:52:16Z [FLINK-8881][runtime] Send accumulator updates via heartbeats commit 040c8d053501e4aba609efc9757905cd6b553ecb Author: zentol <chesnay@...> Date: 2018-03-07T10:05:42Z [FLINK-8935][tests] Implement MiniClusterClient#getAccumulators commit 84b686146d5f7176e6c1dcec9bbdf51b9c601457 Author: zentol <chesnay@...> Date: 2018-03-19T12:59:22Z [FLINK-8959][tests] Port AccumulatorLiveITCase to flip6 ---
[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5715 [FLINK-8956][tests] Port RescalingITCase to flip6 Based on #5690. ## What is the purpose of the change Ports the `RescalingITCase` to use `MiniClusterResource`. ## Verifying this change Run `RescalingITCase` with `flip6` profile enabled/disabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8956 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5715.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5715 ---
[GitHub] flink pull request #5714: [FLINK-8925][tests] Enable Flip6 on travis
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5714 [FLINK-8925][tests] Enable Flip6 on travis ## What is the purpose of the change This PR enables Flip6 for half of our travis profiles. ## Brief change log * add `Old` junit category * mark various tests with the new category * remove 2 existing flip6 profiles * remove `OldAndFlip6` category (implicitly applies to all tests not annotated with `Old` or `Flip6`) * invert flip6 profile; it now *disables* flip6 and is called `old` * add activation property to `old` profile * activate `old` profile in half of our travis profiles You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8924 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5714.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5714 commit 88923d3420dd8fc7a9c227cb45d020d8a38ab61d Author: zentol <chesnay@...> Date: 2018-03-12T12:04:50Z [FLINK-8924][tests] Add "Old" junit category commit 164b2ce94a1b54543810599a83b3e361358f4968 Author: zentol <chesnay@...> Date: 2018-03-12T12:05:55Z [FLINK-8925][tests] Enable Flip6 on travis ---
[GitHub] flink issue #5689: [FLINK-4569][tests] Respect exceptions thrown in thread i...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5689 merging, ---
[GitHub] flink pull request #5701: 8703 c savepoint
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5701 8703 c savepoint Based on #5690 and #5699. ## What is the purpose of the change With this PR accumulator updates are sent via heartbeats from the TaskManager to JobManagers. The SavepointMigrationTestBase was also ported to flip6, serving as preliminary verification until the other accumulator tests are ported. ## Verifying this change *(Please pick either of the following options)* This change added tests and can be verified as follows: * run SavepointMigrationTestBase ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8703_c_savepoint Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5701.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5701 commit 14bcb66d858ee3e9488a51df7bfa1e36ec97f463 Author: zentol <chesnay@...> Date: 2018-03-14T13:21:27Z [FLINK-8942][runtime] Pass heartbeat target ResourceID commit 761fd90f07335883429fc80fb4032b9ef28d32f5 Author: zentol <chesnay@...> Date: 2018-03-14T17:52:16Z [FLINK-8881][runtime] Send accumulator updates via heartbeats commit 280874939c4d77e893da80e1e40acdcc869280bb Author: zentol <chesnay@...> Date: 2018-03-07T10:05:42Z [FLINK-8935][tests] Implement MiniClusterClient#getAccumulators commit d2fa754d9c5b68672ddc16233cdf390dabfd17c0 Author: zentol <chesnay@...> Date: 2018-03-06T12:26:59Z [FLINK-8935][tests] Implement MiniClusterClient#triggerSavepoint commit ea04fd437d305a76c74edc6aa6c473a1fa917895 Author: zentol <chesnay@...> Date: 2018-02-26T13:54:07Z [FLINK-8703][tests] Port SavepointMigrationTestBase to MiniClusterResource ---
[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5699 [FLINK-8942][runtime] Pass heartbeat target ResourceID ## What is the purpose of the change With this PR the heartbeat target `ResourceID` is passed to the `HeartbeatListener` when retrieving the payload so send. This allows the listener to create target-dependent payloads. The primary use-case is FLINK-8881, where accumulators are sent via heartbeats to the JobManager. Here we only want to send accumulators for the relevant job, and not for all jobs. ## Brief change log * add a `ResourceID` parameter to `HeartbeatListener#retrievePayload` * modify return type of `HeartbeatManagerImpl#getHeartbeatTargets` to also contain the target `ResourceID` ## Verifying this change This change added tests: `HeartbeatManagerTest`: * `testHeartbeatManagerTargetPayload` * `testHeartbeatManagerSenderTargetPayload` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8942 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5699.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5699 commit d264dbbd260abc995ced8095a220260e8e0b3f1a Author: zentol <chesnay@...> Date: 2018-03-14T13:21:27Z [FLINK-8942][runtime] Pass heartbeat target ResourceID ---
[GitHub] flink issue #5666: [FLINK-8703][tests] Port KafkaShortRetentionTestBase to M...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5666 merging ---
[GitHub] flink issue #5667: [FLINK-8703][tests] Port NotSoMiniClusterIterations to Mi...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5667 merging ---
[GitHub] flink issue #5668: [FLINK-8703][tests] Port StreamingScalabilityAndLatency t...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5668 merging ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174450032 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java --- @@ -140,19 +159,28 @@ public boolean contains(UK userKey) { } @Override - public byte[] getSerializedValue(K key, N namespace) throws IOException { - Preconditions.checkState(namespace != null, "No namespace given."); - Preconditions.checkState(key != null, "No key given."); + public byte[] getSerializedValue( + byte[] serializedKeyAndNamespace, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + TypeSerializer<HashMap<UK, UV>> valueSerializer) throws Exception { - HashMap<UK, UV> result = stateTable.get(key, namespace); + Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - if (null == result) { + Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, keySerializer, namespaceSerializer); + + Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1); + + if (result == null) { return null; } - TypeSerializer userKeySerializer = stateDesc.getKeySerializer(); - TypeSerializer userValueSerializer = stateDesc.getValueSerializer(); + final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) valueSerializer; --- End diff -- i mean why isn't the signature of `KvStateSerializer.serializeMap`: ``` KvStateSerializer.serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> serializer); ``` with the following implementation: ``` ... serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> serializer) { if (map != null) { DataOutputSerializer dos = new DataOutputSerializer(32); serializer.serialize(map, dos); return dos.getCopyOfBuffer(); } else { return null; } ``` Why deal with the map key/value entries at all outside the serializer? ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r17107 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}. + * + * @param The type of key the state is associated to + * @param The type of the namespace the state is associated to + * @param The type of values kept internally in state + */ +@Internal +public class KvStateEntry<K, N, V> { + + private final InternalKvState<K, N, V> state; + private final KvStateInfo<K, N, V> stateInfo; + + private final boolean isSerializerStateless; + + private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache; + + public KvStateEntry(final InternalKvState<K, N, V> state) { + this.state = Preconditions.checkNotNull(state); + this.stateInfo = new KvStateInfo<>( + state.getKeySerializer(), + state.getNamespaceSerializer(), + state.getValueSerializer() + ); + this.serializerCache = new ConcurrentHashMap<>(); + this.isSerializerStateless = stateInfo.duplicate() == stateInfo; + } + + public InternalKvState<K, N, V> getState() { + return state; + } + + public KvStateInfo<K, N, V> getInfoForCurrentThread() { + return isSerializerStateless + ? stateInfo + : serializerCache.computeIfAbsent(Thread.currentThread(), t -> stateInfo.duplicate()); + } + + public void clear() { + if (serializerCache != null) { + serializerCache.clear(); + } + } + + @VisibleForTesting + public int getCacheSize() { + return serializerCache == null ? -1 : serializerCache.size(); --- End diff -- unnecessary null check ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r17245 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java --- @@ -39,29 +39,27 @@ private final InternalKvState<K, N, V> state; private final KvStateInfo<K, N, V> stateInfo; + private final boolean isSerializerStateless; + private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache; public KvStateEntry(final InternalKvState<K, N, V> state) { - this.state = Preconditions.checkNotNull(state); this.stateInfo = new KvStateInfo<>( state.getKeySerializer(), state.getNamespaceSerializer(), state.getValueSerializer() ); - - this.serializerCache = - stateInfo.duplicate() == stateInfo - ? null // if the serializers are stateless, we do not need a cache - : new ConcurrentHashMap<>(); + this.serializerCache = new ConcurrentHashMap<>(); + this.isSerializerStateless = stateInfo.duplicate() == stateInfo; --- End diff -- -> areSerializersStateless? ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r17070 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}. + * + * @param The type of key the state is associated to + * @param The type of the namespace the state is associated to + * @param The type of values kept internally in state + */ +@Internal +public class KvStateEntry<K, N, V> { + + private final InternalKvState<K, N, V> state; + private final KvStateInfo<K, N, V> stateInfo; + + private final boolean isSerializerStateless; + + private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache; + + public KvStateEntry(final InternalKvState<K, N, V> state) { + this.state = Preconditions.checkNotNull(state); + this.stateInfo = new KvStateInfo<>( + state.getKeySerializer(), + state.getNamespaceSerializer(), + state.getValueSerializer() + ); + this.serializerCache = new ConcurrentHashMap<>(); + this.isSerializerStateless = stateInfo.duplicate() == stateInfo; + } + + public InternalKvState<K, N, V> getState() { + return state; + } + + public KvStateInfo<K, N, V> getInfoForCurrentThread() { + return isSerializerStateless + ? stateInfo + : serializerCache.computeIfAbsent(Thread.currentThread(), t -> stateInfo.duplicate()); + } + + public void clear() { + if (serializerCache != null) { --- End diff -- unnecessary null check ---
[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5697 [FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest ## What is the purpose of the change This PR ports the `ScheduleOrUpdateConsumersTest` to flip6. The existing test was renamed to `LegacyScheduleOrUpdateConsumersTest`, and a ported copy was added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8704_schedule Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5697.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5697 commit d7f42bf65177bb57f8159f7866397f4e55c5d9f0 Author: zentol <chesnay@...> Date: 2018-03-12T12:21:01Z [FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174381716 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java --- @@ -686,7 +686,7 @@ public void testClientServerIntegration() throws Throwable { state.update(201 + i); // we know it must be a KvStat but this is not exposed to the user via State --- End diff -- typo: KvStat ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174380197 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java --- @@ -20,13 +20,16 @@ import org.apache.flink.api.common.state.MapState; +import java.util.Map; + /** * The peer to the {@link MapState} in the internal state type hierarchy. * * See {@link InternalKvState} for a description of the internal state hierarchy. * - * @param The type of the namespace - * @param Type of the values folded into the state - * @param Type of the value in the state + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the values folded into the state --- End diff -- the description seems like a copy from `FoldingState` ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174385360 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java --- @@ -40,6 +54,81 @@ */ public class KvStateRegistryTest extends TestLogger { + @Test + public void testKvStateEntry() throws InterruptedException { + final int threads = 10; + + final CountDownLatch latch1 = new CountDownLatch(threads); + final CountDownLatch latch2 = new CountDownLatch(1); + + final List<KvStateInfo> infos = Collections.synchronizedList(new ArrayList<>()); + + final JobID jobID = new JobID(); + + final JobVertexID jobVertexId = new JobVertexID(); + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1); + final String registrationName = "foobar"; + + final KvStateRegistry kvStateRegistry = new KvStateRegistry(); + final KvStateID stateID = kvStateRegistry.registerKvState( + jobID, + jobVertexId, + keyGroupRange, + registrationName, + new DummyKvState() + ); + + for (int i = 0; i < threads; i++) { + new Thread(() -> { + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + final KvStateInfo stateInfo = kvState.getInfoForCurrentThread(); + infos.add(stateInfo); + + latch1.countDown(); + try { + latch2.await(); + } catch (InterruptedException e) { + Assert.fail(e.getMessage()); --- End diff -- this would never directly fail the test and we may loose the exception ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174380673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java --- @@ -30,18 +30,17 @@ import java.io.IOException; /** - * Heap-backed partitioned {@link ReducingState} that is - * snapshotted into files. + * Heap-backed partitioned {@link ReducingState} that is snapshotted into files. * - * @param The type of the key. - * @param The type of the namespace. - * @param The type of the value added to the state. - * @param The type of the value stored in the state (the accumulator type). - * @param The type of the value returned from the state. + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the value added to the state. + * @param The type of the value stored in the state (the accumulator type). + * @param The type of the value returned from the state. --- End diff -- indentation ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174380400 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java --- @@ -24,9 +24,11 @@ * The peer to the {@link AppendingState} in the internal state type hierarchy. * * See {@link InternalKvState} for a description of the internal state hierarchy. - * - * @paramThe type of the namespace - * @param The type of elements added to the state - * @param The type of the + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param The type of elements added to the state + * @param The type of elements in the state + * @param The type of the resulting element in the state --- End diff -- indentation is weird ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174384860 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java --- @@ -40,6 +54,81 @@ */ public class KvStateRegistryTest extends TestLogger { + @Test + public void testKvStateEntry() throws InterruptedException { + final int threads = 10; + + final CountDownLatch latch1 = new CountDownLatch(threads); + final CountDownLatch latch2 = new CountDownLatch(1); + + final List<KvStateInfo> infos = Collections.synchronizedList(new ArrayList<>()); + + final JobID jobID = new JobID(); + + final JobVertexID jobVertexId = new JobVertexID(); + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1); + final String registrationName = "foobar"; + + final KvStateRegistry kvStateRegistry = new KvStateRegistry(); + final KvStateID stateID = kvStateRegistry.registerKvState( + jobID, + jobVertexId, + keyGroupRange, + registrationName, + new DummyKvState() + ); + + for (int i = 0; i < threads; i++) { + new Thread(() -> { + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + final KvStateInfo stateInfo = kvState.getInfoForCurrentThread(); + infos.add(stateInfo); + + latch1.countDown(); + try { + latch2.await(); + } catch (InterruptedException e) { + Assert.fail(e.getMessage()); + } + + }).start(); + } + + latch1.await(); + + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + + // verify that all the threads are done correctly. + Assert.assertEquals(threads, infos.size()); + Assert.assertEquals(threads, kvState.getCacheSize()); + + latch2.countDown(); + + for (KvStateInfo infoA: infos) { + boolean found = false; + for (KvStateInfo infoB: infos) { + if (infoA == infoB) { + if (found) { + Assert.fail("Already found"); + } + found = true; + } else { + Assert.assertTrue(infoA != infoB && infoA.equals(infoB)); --- End diff -- `infoA != infoB` is redundant ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174381475 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java --- @@ -136,13 +138,13 @@ public void unregisterKvState( } /** -* Returns the KvState instance identified by the given KvStateID or -* null if none is registered. +* Returns the KvState instance identified by the given KvStateID along with --- End diff -- would rephrase to `KvStateEntry` containing the `KvState` ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174380623 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java --- @@ -29,20 +29,19 @@ import java.io.IOException; /** - * Heap-backed partitioned {@link FoldingState} that is - * snapshotted into files. + * Heap-backed partitioned {@link FoldingState} that is snapshotted into files. * - * @param The type of the key. - * @param The type of the namespace. - * @param The type of the values that can be folded into the state. - * @param The type of the value in the folding state. + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the values that can be folded into the state. + * @param The type of the value in the folding state. --- End diff -- indentation is funky ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174383199 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +/** + * Metadata about a {@link InternalKvState}. This includes the serializers for + * the key, the namespace, and the values kept in the state. + * + * @param The type of key the state is associated to + * @param The type of the namespace the state is associated to + * @param The type of values kept internally in state + */ +public class KvStateInfo<K, N, V> { + + private final TypeSerializer keySerializer; + private final TypeSerializer namespaceSerializer; + private final TypeSerializer stateValueSerializer; + + public KvStateInfo( + final TypeSerializer keySerializer, + final TypeSerializer namespaceSerializer, + final TypeSerializer stateValueSerializer + ) { + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer); + this.stateValueSerializer = Preconditions.checkNotNull(stateValueSerializer); + } + + /** +* @return The serializer for the key the state is associated to. +*/ + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + /** +* @return The serializer for the namespace the state is associated to. +*/ + public TypeSerializer getNamespaceSerializer() { + return namespaceSerializer; + } + + /** +* @return The serializer for the values kept in the state. +*/ + public TypeSerializer getStateValueSerializer() { + return stateValueSerializer; + } + + /** +* Creates a deep copy of the current {@link KvStateInfo} by duplicating +* all the included serializers. +* +* This method assumes correct implementation of the {@link TypeSerializer#duplicate()} +* method of the included serializers. +*/ + public KvStateInfo<K, N, V> duplicate() { + final TypeSerializer dupKeySerializer = keySerializer.duplicate(); + final TypeSerializer dupNamespaceSerializer = namespaceSerializer.duplicate(); + final TypeSerializer dupSVSerializer = stateValueSerializer.duplicate(); + + if ( + dupKeySerializer == keySerializer && --- End diff -- odd formatting ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174385834 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java --- @@ -40,6 +54,81 @@ */ public class KvStateRegistryTest extends TestLogger { + @Test + public void testKvStateEntry() throws InterruptedException { + final int threads = 10; + + final CountDownLatch latch1 = new CountDownLatch(threads); + final CountDownLatch latch2 = new CountDownLatch(1); + + final List<KvStateInfo> infos = Collections.synchronizedList(new ArrayList<>()); + + final JobID jobID = new JobID(); + + final JobVertexID jobVertexId = new JobVertexID(); + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1); + final String registrationName = "foobar"; + + final KvStateRegistry kvStateRegistry = new KvStateRegistry(); + final KvStateID stateID = kvStateRegistry.registerKvState( + jobID, + jobVertexId, + keyGroupRange, + registrationName, + new DummyKvState() + ); + + for (int i = 0; i < threads; i++) { + new Thread(() -> { + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + final KvStateInfo stateInfo = kvState.getInfoForCurrentThread(); + infos.add(stateInfo); + + latch1.countDown(); + try { + latch2.await(); + } catch (InterruptedException e) { + Assert.fail(e.getMessage()); + } + + }).start(); + } + + latch1.await(); + + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + + // verify that all the threads are done correctly. + Assert.assertEquals(threads, infos.size()); + Assert.assertEquals(threads, kvState.getCacheSize()); + + latch2.countDown(); + + for (KvStateInfo infoA: infos) { + boolean found = false; + for (KvStateInfo infoB: infos) { + if (infoA == infoB) { + if (found) { --- End diff -- `found` needs a better name or a comment what it means ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174379723 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java --- @@ -26,12 +26,14 @@ * The peer to the {@link MergingState} in the internal state type hierarchy. * * See {@link InternalKvState} for a description of the internal state hierarchy. - * - * @paramThe type of the namespace - * @param The type of elements added to the state - * @param The type of elements + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param The type of elements added to the state + * @param The type of elements in the state + * @param The type of elements --- End diff -- indentation is off and the description deserves an extension ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174380744 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java --- @@ -28,18 +28,19 @@ import java.util.Collection; /** - * Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState}) - * that is stored on the heap. + * Base class for {@link MergingState} ({@link InternalMergingState}) that is stored on the heap. * - * @param The type of the key. - * @param The type of the namespace. - * @param The type of the values in the state. - * @param The type of State - * @param The type of StateDescriptor for the State S + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the input elements. + * @param The type of the values in the state. + * @param The type of the output elements. --- End diff -- indentation ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174385129 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java --- @@ -40,6 +54,81 @@ */ public class KvStateRegistryTest extends TestLogger { + @Test + public void testKvStateEntry() throws InterruptedException { + final int threads = 10; + + final CountDownLatch latch1 = new CountDownLatch(threads); + final CountDownLatch latch2 = new CountDownLatch(1); + + final List<KvStateInfo> infos = Collections.synchronizedList(new ArrayList<>()); + + final JobID jobID = new JobID(); + + final JobVertexID jobVertexId = new JobVertexID(); + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1); + final String registrationName = "foobar"; + + final KvStateRegistry kvStateRegistry = new KvStateRegistry(); + final KvStateID stateID = kvStateRegistry.registerKvState( + jobID, + jobVertexId, + keyGroupRange, + registrationName, + new DummyKvState() + ); + + for (int i = 0; i < threads; i++) { + new Thread(() -> { + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + final KvStateInfo stateInfo = kvState.getInfoForCurrentThread(); + infos.add(stateInfo); + + latch1.countDown(); + try { + latch2.await(); + } catch (InterruptedException e) { + Assert.fail(e.getMessage()); + } + + }).start(); + } + + latch1.await(); + + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + + // verify that all the threads are done correctly. + Assert.assertEquals(threads, infos.size()); + Assert.assertEquals(threads, kvState.getCacheSize()); + + latch2.countDown(); + + for (KvStateInfo infoA: infos) { + boolean found = false; + for (KvStateInfo infoB: infos) { + if (infoA == infoB) { + if (found) { + Assert.fail("Already found"); + } + found = true; + } else { + Assert.assertTrue(infoA != infoB && infoA.equals(infoB)); + } + } + } + + kvStateRegistry.unregisterKvState( + jobID, + jobVertexId, + keyGroupRange, + registrationName, + stateID); + + // we have to call for garbage collection to be sure that everything is cleared up. --- End diff -- ? ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174383043 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}. + * + * @param The type of key the state is associated to + * @param The type of the namespace the state is associated to + * @param The type of values kept internally in state + */ +@Internal +public class KvStateEntry<K, N, V> { + + private final InternalKvState<K, N, V> state; + private final KvStateInfo<K, N, V> stateInfo; + + private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache; + + public KvStateEntry(final InternalKvState<K, N, V> state) { + + this.state = Preconditions.checkNotNull(state); + this.stateInfo = new KvStateInfo<>( + state.getKeySerializer(), + state.getNamespaceSerializer(), + state.getValueSerializer() + ); + + this.serializerCache = + stateInfo.duplicate() == stateInfo + ? null // if the serializers are stateless, we do not need a cache --- End diff -- An empty map would be acceptable here. ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174385024 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java --- @@ -40,6 +54,81 @@ */ public class KvStateRegistryTest extends TestLogger { + @Test + public void testKvStateEntry() throws InterruptedException { + final int threads = 10; + + final CountDownLatch latch1 = new CountDownLatch(threads); + final CountDownLatch latch2 = new CountDownLatch(1); + + final List<KvStateInfo> infos = Collections.synchronizedList(new ArrayList<>()); + + final JobID jobID = new JobID(); + + final JobVertexID jobVertexId = new JobVertexID(); + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1); + final String registrationName = "foobar"; + + final KvStateRegistry kvStateRegistry = new KvStateRegistry(); + final KvStateID stateID = kvStateRegistry.registerKvState( + jobID, + jobVertexId, + keyGroupRange, + registrationName, + new DummyKvState() + ); + + for (int i = 0; i < threads; i++) { + new Thread(() -> { + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + final KvStateInfo stateInfo = kvState.getInfoForCurrentThread(); + infos.add(stateInfo); + + latch1.countDown(); + try { + latch2.await(); + } catch (InterruptedException e) { + Assert.fail(e.getMessage()); + } + + }).start(); + } + + latch1.await(); + + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + + // verify that all the threads are done correctly. + Assert.assertEquals(threads, infos.size()); + Assert.assertEquals(threads, kvState.getCacheSize()); + + latch2.countDown(); + + for (KvStateInfo infoA: infos) { + boolean found = false; + for (KvStateInfo infoB: infos) { + if (infoA == infoB) { + if (found) { + Assert.fail("Already found"); --- End diff -- needs a better error message ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174382685 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java --- @@ -78,13 +75,22 @@ public KvStateServerHandler( final CompletableFuture responseFuture = new CompletableFuture<>(); try { - final InternalKvState kvState = registry.getKvState(request.getKvStateId()); + final KvStateEntry kvState = registry.getKvState(request.getKvStateId()); if (kvState == null) { responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId())); } else { byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace(); - byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); + // here we remove any type check... + // Ideally we want to keep that the info match the state. --- End diff -- you can retain type safety: Call from the handler: ``` byte[] serializedResult = getSerializedValue(kvState, serializedKeyAndNamespace); ``` Added method: ``` private static <K, N, V> byte[] getSerializedValue(KvStateEntry<K, N, V> entry, byte[] serializedKeyAndNamespace) throws Exception { InternalKvState<K, N, V> state = entry.getState(); KvStateInfo<K, N, V> infoForCurrentThread = entry.getInfoForCurrentThread(); return state.getSerializedValue( serializedKeyAndNamespace, infoForCurrentThread.getKeySerializer(), infoForCurrentThread.getNamespaceSerializer(), infoForCurrentThread.getStateValueSerializer() ); } ``` ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174383696 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java --- @@ -140,19 +159,28 @@ public boolean contains(UK userKey) { } @Override - public byte[] getSerializedValue(K key, N namespace) throws IOException { - Preconditions.checkState(namespace != null, "No namespace given."); - Preconditions.checkState(key != null, "No key given."); + public byte[] getSerializedValue( + byte[] serializedKeyAndNamespace, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + TypeSerializer<HashMap<UK, UV>> valueSerializer) throws Exception { - HashMap<UK, UV> result = stateTable.get(key, namespace); + Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - if (null == result) { + Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, keySerializer, namespaceSerializer); + + Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1); + + if (result == null) { return null; } - TypeSerializer userKeySerializer = stateDesc.getKeySerializer(); - TypeSerializer userValueSerializer = stateDesc.getValueSerializer(); + final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) valueSerializer; --- End diff -- this shouldn't be necessary. why can't we just pass the map serialzer into `serializeMap`? ---
[GitHub] flink issue #5677: [hotfix] [streaming API] update doc of InternalTimerServi...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5677 merging. ---
[GitHub] flink issue #5692: [docs] Fix typo in Python API docs
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5692 merging. ---
[GitHub] flink issue #5679: [FLINK-8916] Checkpointing Mode is always shown to be "At...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5679 merging. ---
[GitHub] flink issue #5670: [FLINK-8904][cli][tests] always restore the previous syso...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5670 merging. ---
[GitHub] flink pull request #5695: [FLINK-8704][tests] Port PartialConsumerPipelinedR...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5695 [FLINK-8704][tests] Port PartialConsumerPipelinedResultTest ## What is the purpose of the change This PR ports the `PartialConsumerPipelinedResultTest` to flip6. The existing test was renamed to `LegacyPartialConsumerPipelinedResultTest`, and a ported copy was added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8704_pipe Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5695.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5695 commit 4baf286842849652d2b33654f93ef3cb7870c2d2 Author: zentol <chesnay@...> Date: 2018-03-12T12:09:44Z [FLINK-8704][tests] Port PartialConsumerPipelinedResultTest ---
[GitHub] flink pull request #5694: [FLINK-8704][tests] Port SlotCountExceedingParalle...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5694 [FLINK-8704][tests] Port SlotCountExceedingParallelismTest ## What is the purpose of the change This PR ports the `SlotCountExceedingParallelismTest` to flip6. The existing test was renamed to `LegacySlotCountExceedingParallelismTest`, and a ported copy was added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8704_slot Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5694.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5694 commit bd6913fe5a430e9fa4b718c905ea16979eaab1d2 Author: zentol <chesnay@...> Date: 2018-03-12T12:52:57Z [FLINK-8704][tests] Port SlotCountExceedingParallelismTest ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174148331 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java --- @@ -78,13 +75,22 @@ public KvStateServerHandler( final CompletableFuture responseFuture = new CompletableFuture<>(); try { - final InternalKvState kvState = registry.getKvState(request.getKvStateId()); + final KvStateEntry kvState = registry.getKvState(request.getKvStateId()); if (kvState == null) { responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId())); } else { byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace(); - byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); + // here we remove any type check... + // Ideally we want to keep that the info match the state. + final InternalKvState state = kvState.getState(); + final KvStateInfo info = kvState.getInfoForCurrentThread(); + + byte[] serializedResult = state.getSerializedValue( --- End diff -- well we could move the thread-caching into the `InternalKvStates`. ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174144247 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java --- @@ -78,13 +75,22 @@ public KvStateServerHandler( final CompletableFuture responseFuture = new CompletableFuture<>(); try { - final InternalKvState kvState = registry.getKvState(request.getKvStateId()); + final KvStateEntry kvState = registry.getKvState(request.getKvStateId()); if (kvState == null) { responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId())); } else { byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace(); - byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); + // here we remove any type check... + // Ideally we want to keep that the info match the state. + final InternalKvState state = kvState.getState(); + final KvStateInfo info = kvState.getInfoForCurrentThread(); + + byte[] serializedResult = state.getSerializedValue( --- End diff -- Ah, i mistakenly thought that `getSerializedValue()` is also called out of QS. What would be the overhead of always duplicating serializers within `getSerializedValue()`? ---
[GitHub] flink pull request #5689: [FLINK-4569][tests] Respect exceptions thrown in t...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5689#discussion_r174140412 --- Diff: flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java --- @@ -119,6 +121,11 @@ public void run() { lock.release(); resumingThread.join(); + + Throwable exception = error.get(); + if (exception != null) { + throw new AssertionError(exception); + } --- End diff -- assertNull throws a NPE. This throws the actual exception with stacktrace etc and is the most equivalent to just throwing the exception. ---
[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174128377 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java --- @@ -78,13 +75,22 @@ public KvStateServerHandler( final CompletableFuture responseFuture = new CompletableFuture<>(); try { - final InternalKvState kvState = registry.getKvState(request.getKvStateId()); + final KvStateEntry kvState = registry.getKvState(request.getKvStateId()); if (kvState == null) { responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId())); } else { byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace(); - byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); + // here we remove any type check... + // Ideally we want to keep that the info match the state. + final InternalKvState state = kvState.getState(); + final KvStateInfo info = kvState.getInfoForCurrentThread(); + + byte[] serializedResult = state.getSerializedValue( --- End diff -- Couldn't we synchronize on `kvState` instead of modifying all `InternalKvState` implementations? This seems like a much safer alternative than baking in the assumption that `getSerializedValue()` can be called concurrently. ---
[GitHub] flink pull request #5690: [FLINK-8935][tests] Extend MiniClusterClient
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5690 [FLINK-8935][tests] Extend MiniClusterClient ## What is the purpose of the change With this PR the `MiniClusterClient` implements `listJobs()`, `getAccumulators()`, `triggerSavepoint()` and `stop()`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8935 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5690.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5690 commit 4382ae8aa781ec7bc469fb6ceed551434478b342 Author: zentol <chesnay@...> Date: 2018-03-07T10:05:12Z [FLINK-8935][tests] Implement MiniClusterClient#listJobs commit f15bc13d2b749d300dfb3762ab8faf36e963aa4f Author: zentol <chesnay@...> Date: 2018-03-07T10:05:42Z [FLINK-8935][tests] Implement MiniClusterClient#getAccumulators commit 5ae94c92906784620fc4fd7e6dbcc4c5ff62683d Author: zentol <chesnay@...> Date: 2018-03-06T12:26:59Z [FLINK-8935][tests] Implement MiniClusterClient#triggerSavepoint commit a263e3f39899c3d6b0cfbf623609007ba03f0f0b Author: zentol <chesnay@...> Date: 2018-03-07T12:02:27Z [FLINK-8935][tests] Implement MiniClusterClient#stop ---
[GitHub] flink pull request #5689: [FLINK-4569][tests] Respect exceptions thrown in t...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5689 [FLINK-4569][tests] Respect exceptions thrown in thread in JobRetrievalITCase ## What is the purpose of the change This PR ensures that exceptions that occur in the submitting thread actually fail the test. Also contains minor fixes, like setting a thread name (for debugging purposes) and releasing the lock in the `SemaphoreInvokable` to allow multiple runs in the IDE. ## Verifying the change One can modify the test to throw an exception after connecting successfully, which will fail the test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 4569 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5689.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5689 commit 13c383d439550b1ebdc869981a7ffa19a6c8a151 Author: zentol <chesnay@...> Date: 2018-03-13T12:00:47Z [FLINK-4569][tests] Respect exceptions thrown in thread in JobRetrievalITCase ---
[GitHub] flink pull request #5687: [FLINK-8934] [flip6] Properly cancel slot requests...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5687#discussion_r174060373 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java --- @@ -505,17 +513,21 @@ public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception } catch (ExecutionException ee) { // expected assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof FlinkException); - } - final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + assertEquals(allocationId1, canceledSlotRequests.take()); + + final SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN); slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); // the slot offer should fulfill the second slot request - assertEquals(allocationId, slotFuture2.get().getAllocationId()); + assertEquals(allocationId1, slotFuture2.get().getAllocationId()); + + // check that the second slot request has been canceled --- End diff -- Let's see if i understood the scenario here correctly: We request the allocation of 2 slots. We cancel the first allocation request, but a TaskManager has already offered a slot to fulfill it. We now have one pending allocation request and one offered slot, so we re-use the slot for the second request, which we can do since both requests were for the same job with the same resource requirements. I think we can improve the wording a bit though, as this comment here says that the second request has been canceled, when just above it was fulfilled. I guess it should say that the slot _acquisition_ (i.e. the retrieval of slots from the RM) has been canceled. (also applies to the canceledSlotRequests variable) ---
[GitHub] flink pull request #5681: [FLINK-4569][tests] Fix JobRetrievalITCase
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/5681 ---
[GitHub] flink pull request #5681: [FLINK-4569][tests] Fix JobRetrievalITCase
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5681 [FLINK-4569][tests] Fix JobRetrievalITCase ## What is the purpose of the change This PR contains 2 changes to the `JobRetrievalITCase`. 1) We now store exceptions that occur in the submitting thread in a `AtomicReference` that we evaluate at the end of the test. 2) It could happen that the lock (preventing the job from terminating) was released before the client had a chance to finish the retrieval. To remedy this we now only release the lock if the thread has terminated, either by successfully connecting or failing with an error. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 4569 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5681.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5681 commit d45d8883365ef73c7064adfd098feea208d2ca4c Author: zentol <chesnay@...> Date: 2018-03-12T11:01:57Z [FLINK-4569][tests] Fix JobRetrievalITCase ---
[GitHub] flink pull request #5665: [FLINK-8703][tests] Port WebFrontendITCase to Mini...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5665#discussion_r173743909 --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java --- @@ -66,40 +73,44 @@ private static final int NUM_TASK_MANAGERS = 2; private static final int NUM_SLOTS = 4; - private static LocalFlinkMiniCluster cluster; + private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration(); - private static int port = -1; + @ClassRule + public static final MiniClusterResource CLUSTER = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + CLUSTER_CONFIGURATION, + NUM_TASK_MANAGERS, + NUM_SLOTS), + true + ); - @BeforeClass - public static void initialize() throws Exception { + private static Configuration getClusterConfiguration() { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS); - config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); - config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); - - File logDir = File.createTempFile("TestBaseUtils-logdir", null); - assertTrue("Unable to delete temp file", logDir.delete()); - assertTrue("Unable to create temp directory", logDir.mkdir()); - File logFile = new File(logDir, "jobmanager.log"); - File outFile = new File(logDir, "jobmanager.out"); - - Files.createFile(logFile.toPath()); - Files.createFile(outFile.toPath()); + try { + File logDir = File.createTempFile("TestBaseUtils-logdir", null); --- End diff -- Making it a rule is a bit tricky since you end up with a dependency between rules as the cluster is also one. ---
[GitHub] flink pull request #5679: [FLINK-8916] Checkpointing Mode is always shown to...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5679#discussion_r173742203 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java --- @@ -143,10 +152,66 @@ public int hashCode() { } /** -* Processing mode. +* JSON serializer for {@link ProcessingMode}. */ + @JsonSerialize(using = ProcessingModeSerializer.class) + @JsonDeserialize(using = ProcessingModeDeserializer.class) public enum ProcessingMode { - AT_LEAST_ONCE, - EXACTLY_ONCE + AT_LEAST_ONCE("at_least_once"), + EXACTLY_ONCE("exactly_once"); + + private String value; + + ProcessingMode(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static ProcessingMode fromString(String value) { --- End diff -- these changes to the enum seem unnecessary ---
[GitHub] flink pull request #5679: [FLINK-8916] Checkpointing Mode is always shown to...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5679#discussion_r173742094 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java --- @@ -143,10 +152,66 @@ public int hashCode() { } /** -* Processing mode. +* JSON serializer for {@link ProcessingMode}. */ + @JsonSerialize(using = ProcessingModeSerializer.class) + @JsonDeserialize(using = ProcessingModeDeserializer.class) public enum ProcessingMode { - AT_LEAST_ONCE, - EXACTLY_ONCE + AT_LEAST_ONCE("at_least_once"), + EXACTLY_ONCE("exactly_once"); + + private String value; + + ProcessingMode(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static ProcessingMode fromString(String value) { + for (ProcessingMode mode : ProcessingMode.values()) { + if (mode.value.equalsIgnoreCase(value)) { + return mode; + } + } + + throw new IllegalArgumentException("No constant with value " + value + " found"); + } + + } + + /** +* JSON deserializer for {@link ProcessingMode}. +*/ + public static class ProcessingModeSerializer extends StdSerializer { + + public ProcessingModeSerializer() { + super(ProcessingMode.class); + } + + @Override + public void serialize(ProcessingMode mode, JsonGenerator generator, SerializerProvider serializerProvider) + throws IOException { + generator.writeString(mode.getValue()); + } } + + /** +* Processing mode deserializer. +*/ + public static class ProcessingModeDeserializer extends StdDeserializer { + + public ProcessingModeDeserializer() { + super(ProcessingMode.class); + } + + @Override + public ProcessingMode deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException { + return ProcessingMode.fromString(jsonParser.getValueAsString()); --- End diff -- replace with `ProcessingMode.valueOf(jsonParser.getValueAsString.toUpperCase())` ---
[GitHub] flink pull request #5679: [FLINK-8916] Checkpointing Mode is always shown to...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5679#discussion_r173741952 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java --- @@ -143,10 +152,66 @@ public int hashCode() { } /** -* Processing mode. +* JSON serializer for {@link ProcessingMode}. */ + @JsonSerialize(using = ProcessingModeSerializer.class) + @JsonDeserialize(using = ProcessingModeDeserializer.class) public enum ProcessingMode { - AT_LEAST_ONCE, - EXACTLY_ONCE + AT_LEAST_ONCE("at_least_once"), + EXACTLY_ONCE("exactly_once"); + + private String value; + + ProcessingMode(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static ProcessingMode fromString(String value) { + for (ProcessingMode mode : ProcessingMode.values()) { + if (mode.value.equalsIgnoreCase(value)) { + return mode; + } + } + + throw new IllegalArgumentException("No constant with value " + value + " found"); + } + + } + + /** +* JSON deserializer for {@link ProcessingMode}. +*/ + public static class ProcessingModeSerializer extends StdSerializer { + + public ProcessingModeSerializer() { + super(ProcessingMode.class); + } + + @Override + public void serialize(ProcessingMode mode, JsonGenerator generator, SerializerProvider serializerProvider) + throws IOException { + generator.writeString(mode.getValue()); --- End diff -- replace with `generator.writeString(mode.name().toLowerCase())` ---
[GitHub] flink issue #5673: [FLINK-8832] [sql-client] Create a SQL Client Kafka 0.11 ...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5673 How is this any different to a regular fat jar? I'm wondering what the purpose of the `sql-jar` suffix is. ---
[GitHub] flink pull request #5670: [FLINK-8904][cli][tests] always restore the previo...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5670#discussion_r173627882 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestUtils.java --- @@ -65,9 +67,14 @@ public static String getInvalidConfigDir() { } public static void pipeSystemOutToNull() { + previousSysout = System.out; --- End diff -- Doe it have to be done this way or could we make `previousSysout` final? ---
[GitHub] flink issue #5609: [FLINK-8822] RotateLogFile may not work well when sed ver...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5609 ehh...probably. The PR has a point as `sed --help` (v4.2.2) does not list the `-E` option. If we want to be super safe we could check that `sed -r` is supported and use `-E` as a backup. ---
[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5669#discussion_r173455497 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java --- @@ -81,12 +81,6 @@ public void testCancelingEmptyTopic() throws Exception { public void testCancelingFullTopic() throws Exception { runCancelingOnFullInputTest(); } - --- End diff -- ah right, forgot to comment that. I found these test to be rather..._odd_. They check behavior when not enough slots are available, but in the old code afaik this fails before the client even submits the job, and in flip6 this stalls as we never check whether enough slots are available (i guess with the underlying assumption that we would just allocate more TMs until we have enough). ---
[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5669 [FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource ## What is the purpose of the change Ports the `KafkaTestBase` and extending classes to use `MiniClusterResource`. ## Verifying this change Run all tests extending `KafkaTestBase` with `flip6` profile enabled/disabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8703_kafkaB Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5669.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5669 commit 545b4dcccf79ce1b8bb530bae77ab4c2b6e85351 Author: zentol <chesnay@...> Date: 2018-03-07T12:38:03Z Remove Kafka testFailOnDeploy test commit 3cdfd906cdefa85fe36b6717f41e831ca7e5ea72 Author: zentol <chesnay@...> Date: 2018-03-07T12:39:25Z [FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource ---
[GitHub] flink pull request #5668: [FLINK-8703][tests] Port StreamingScalabilityAndLa...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5668 [FLINK-8703][tests] Port StreamingScalabilityAndLatency to MiniCluste… ## What is the purpose of the change Ports the `StreamingScalabilityAndLatency` to use `MiniClusterResource`. ## Verifying this change Run `StreamingScalabilityAndLatency` with `flip6` profile enabled/disabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8703_stream Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5668.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5668 commit 08f56f6e954a37c9135bac891ad0bb6a31dcc8b0 Author: zentol <chesnay@...> Date: 2018-02-27T14:21:50Z [FLINK-8703][tests] Port StreamingScalabilityAndLatency to MiniClusterResource ---
[GitHub] flink pull request #5667: [FLINK-8703][tests] Port NotSoMiniClusterIteration...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5667 [FLINK-8703][tests] Port NotSoMiniClusterIterations to MiniClusterResource ## What is the purpose of the change Ports the `NotSoMiniClusterIterations` to use `MiniClusterResource`. ## Verifying this change Run `NotSoMiniClusterIterations` with `flip6` profile enabled/disabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8703_iter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5667.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5667 commit 858b2e4a31ad186b718c529a70757fda428a92ef Author: zentol <chesnay@...> Date: 2018-02-27T14:19:50Z [FLINK-8703][tests] Port NotSoMiniClusterIterations to MiniClusterResource ---
[GitHub] flink pull request #5666: [FLINK-8703][tests] Port KafkaShortRetentionTestBa...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5666 [FLINK-8703][tests] Port KafkaShortRetentionTestBase to MiniClusterRe… ## What is the purpose of the change Ports the `KafkaShortRetentionTestBase` to use `MiniClusterResource`. ## Verifying this change Run `KafkaShortRetentionTestBase` with `flip6` profile enabled/disabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8703_kafke_short Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5666.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5666 commit 2aa3b9cdb39f812d40fb0be88265d1b36b631661 Author: zentol <chesnay@...> Date: 2018-02-27T10:11:59Z [FLINK-8703][tests] Port KafkaShortRetentionTestBase to MiniClusterResource ---
[GitHub] flink pull request #5665: [FLINK-8703][tests] Port WebFrontendITCase to Mini...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5665 [FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource ## What is the purpose of the change Ports the `WebFrontendITCase` to use `MiniClusterResource`. ## Brief change log * implement prerequisite `MiniClusterClient#listJobs()` * modify `MiniClusterResource` to expose WebUI/REST port * make `MiniClusterResource#CODEBASE_KEY/FLIP6_CODEBASE` public to support version dependent test behavior (not pretty but the alternative would be a full copy of the test) * port WebFrontendITCase ## Verifying this change Run `WebFrontendITCase` with `flip6` profile enabled/disabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8703_web Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5665.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5665 commit 41c5737545897506470f531394269b94ebe2ef12 Author: zentol <chesnay@...> Date: 2018-03-07T10:05:12Z [FLINK-8811][tests] Implement MiniClusterClient#listJobs commit 98f366238cddcef38fff08b14764c9120dbcccea Author: zentol <chesnay@...> Date: 2018-03-07T10:14:20Z [FLINK-8703][tests] Expose WebUI port commit cb903f3681c808a3e5011211af6b7d174f86a23e Author: zentol <chesnay@...> Date: 2018-03-07T10:14:46Z [FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource ---
[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5664 [FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource ## What is the purpose of the change Ports the {{CancelingTestBase}} to use {{MiniClusterResource}}. ## Verifying this change Run `MapCancelingTestBase` with `flip6` profile enabled/disabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8703_canceling Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5664.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5664 commit 57338df4819b2324f7ede2b131f81d83bc9096b2 Author: zentol <chesnay@...> Date: 2018-02-26T14:36:37Z [FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource commit 22d4a2f02c256eb41a1684a5766a1dd53dc9351d Author: zentol <chesnay@...> Date: 2018-02-28T12:43:42Z [hotfix][tests] Properly disable JoinCancelingITCase ---
[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5573 please do not trigger builds just to get that perfect green build. The test failure here is quite common at the moment. ---
[GitHub] flink issue #5657: [FLINK-8887][tests] Add single retry in MiniClusterClient
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5657 merging. ---
[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5573 @yanghua This test failure is unrelated, you can ignore it. ---
[GitHub] flink issue #5554: [FLINK-8729][streaming] Refactor JSONGenerator to use jac...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5554 merging. ---
[GitHub] flink issue #5637: [FLINK-8860][flip6] stop SlotManager spamming logs for ev...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5637 merging. ---
[GitHub] flink pull request #5594: [FLINK-8800][REST] Reduce logging of all requests ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5594#discussion_r173266836 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -84,8 +84,8 @@ protected AbstractHandler( @Override protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, T gateway) throws Exception { - if (log.isDebugEnabled()) { - log.debug("Received request " + routed.request().getUri() + '.'); + if (log.isTraceEnabled()) { --- End diff -- Yes we could simplify this, however we would be relying on implementation details of 2 libraries. ---
[GitHub] flink issue #5651: [FLINK-8889][tests] Do not override cluster config values
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5651 merging. ---
[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172922843 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,9 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + // we do not have concurrency checks here, because serialize() and + // deserialize() do the checks and the current mechanism does not handle --- End diff -- looks like the end of the comment is missing ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172893483 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_J
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172893235 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_J
[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172893030 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,19 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + if (CONCURRENT_ACCESS_CHECK) { + enterExclusiveThread(); + } + + try { + T value = deserialize(source); --- End diff -- Have to point out that after `deserialize()` the checks in copy() are ineffective as the `currentThread` field has already been nulled. In other words, we guard against concurrent access before deserialize(), and within deserialize()(), but not between deserialize()() or after serialize(). This isn't a _problem_ as all code is actually covered, but we may want to document that. ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172857716 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_J
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172856770 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_J
[GitHub] flink issue #5656: [FLINK-8487] Verify ZooKeeper checkpoint store behaviour ...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5656 should probably clean this branch from the restore/QS tests, and rebase it once. ---
[GitHub] flink pull request #5657: [FLINK-8887][tests] Add single retry in MiniCluste...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5657 [FLINK-8887][tests] Add single retry in MiniClusterClient ## What is the purpose of the change This PR presents a test workaround for race-conditions in FLIP-6 (most notably FLINK-8887). Basically, every `MiniClusterClient` call is retried *once* after 500ms in case of certain exceptions. **This is only a band-aid until a proper fix is in place** so we can finally continue merging more test ports. ## Brief change log * add `guardWithSingleRetry` convenience method * add `ScheduledExecutor` to `MiniClusterClient` * guard all calls to the `MiniCluster` ## Verifying this change The change can be verified by cherry-picking [this branch](https://github.com/zentol/flink/tree/8797) and running the `AbstractOperatorRestoreTestBase`. Before this change there was always 1-2 tests failing, whereas now none should fail. /cc @aljoscha @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8887_bandaid Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5657.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5657 commit f685fad6731b7c1774b247985a446260ea285663 Author: zentol <chesnay@...> Date: 2018-03-07T14:02:05Z [FLINK-8887][tests] Add single retry in MiniClusterClient ---
[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5648#discussion_r172835665 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -404,7 +406,29 @@ public void start() throws Exception { final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); if (jobManagerRunner != null) { - return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); + CompletableFuture statusFuture = jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); + statusFuture.handle((JobStatus status, Throwable throwable) -> { + if (throwable != null) { + Throwable error = ExceptionUtils.stripCompletionException(throwable); + + if (error instanceof FencingTokenException) { --- End diff -- missing else block causes other exceptions to be swallowed... ---
[GitHub] flink pull request #5653: [FLINK-8890] Compare checkpoints with order in Com...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5653#discussion_r172812790 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java --- @@ -290,16 +289,14 @@ public static boolean checkpointsMatch( Collection first, Collection second) { --- End diff -- Ah, I would've expected that CompletedCheckpoint has an equals method, never mind then. ---
[GitHub] flink pull request #5653: [FLINK-8890] Compare checkpoints with order in Com...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5653#discussion_r172808666 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java --- @@ -290,16 +289,14 @@ public static boolean checkpointsMatch( Collection first, Collection second) { --- End diff -- change type to List? then we could skip the copy ado the comparison directly. ---
[GitHub] flink pull request #5653: [FLINK-8890] Compare checkpoints with order in Com...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5653#discussion_r172807476 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java --- @@ -290,16 +289,14 @@ public static boolean checkpointsMatch( Collection first, Collection second) { - Set<Tuple2<Long, JobID>> firstInterestingFields = - new HashSet<>(); + List<Tuple2<Long, JobID>> firstInterestingFields = new ArrayList<>(); --- End diff -- initialize with `first.size()` ---
[GitHub] flink pull request #5653: [FLINK-8890] Compare checkpoints with order in Com...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5653#discussion_r172809760 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java --- @@ -290,16 +289,14 @@ public static boolean checkpointsMatch( Collection first, Collection second) { - Set<Tuple2<Long, JobID>> firstInterestingFields = - new HashSet<>(); + List<Tuple2<Long, JobID>> firstInterestingFields = new ArrayList<>(); for (CompletedCheckpoint checkpoint : first) { --- End diff -- For stability purposes we may want to check whether the input collections are actually sorted.. I know that the sole user `ZooKeeperCompletedCheckpointStore` does in fact pass sorted lists, but technically there's no guarantee at the moment. ---
[GitHub] flink pull request #5653: [FLINK-8890] Compare checkpoints with order in Com...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5653#discussion_r172807512 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java --- @@ -290,16 +289,14 @@ public static boolean checkpointsMatch( Collection first, Collection second) { - Set<Tuple2<Long, JobID>> firstInterestingFields = - new HashSet<>(); + List<Tuple2<Long, JobID>> firstInterestingFields = new ArrayList<>(); for (CompletedCheckpoint checkpoint : first) { firstInterestingFields.add( new Tuple2<>(checkpoint.getCheckpointID(), checkpoint.getJobId())); } - Set<Tuple2<Long, JobID>> secondInterestingFields = - new HashSet<>(); + List<Tuple2<Long, JobID>> secondInterestingFields = new ArrayList<>(); --- End diff -- initialize with `second.size()` ---
[GitHub] flink issue #5652: [hotfix][tests] Do not use singleActorSystem in LocalFlin...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5652 yup. But one profile is already scratching the 50m limit as is :/ ---
[GitHub] flink issue #5652: [hotfix][tests] Do not use singleActorSystem in LocalFlin...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5652 The alternative would be to make the `ClusterClient` functionality optional and force tests to explicitly enable it. ---
[GitHub] flink issue #5652: [hotfix][tests] Do not use singleActorSystem in LocalFlin...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5652 All legacy tests going through the `MiniClusterResource` will take longer. I don't know by how much, but we now have to start multiple actor systems and the JM<->TM communication is no longer local. ---
[GitHub] flink pull request #5652: [hotfix][tests] Do not use singleActorSystem in Lo...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5652 [hotfix][tests] Do not use singleActorSystem in LocalFlinkMiniCluster ## What is the purpose of the change The legacy cluster started in {{MiniClusterResource}} used a single actor system, which rendered the returned {{ClusterClient}} unusable. This change will unfortunately cause tests to take longer, but i don't know how to fix this in another way. On every access you would get this exception below: ``` org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway. at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:513) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:113) Caused by: org.apache.flink.util.FlinkException: Could not find out our own hostname by connecting to the leading JobManager. Please make sure that the Flink cluster has been started. at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:248) at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:923) at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:511) ... 30 more Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not find the connecting address by connecting to the current leader. at org.apache.flink.runtime.util.LeaderRetrievalUtils.findConnectingAddress(LeaderRetrievalUtils.java:164) at org.apache.flink.runtime.util.LeaderRetrievalUtils.findConnectingAddress(LeaderRetrievalUtils.java:145) at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:244) ... 32 more Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the connecting address to the current leader with the akka URL akka://flink/user/jobmanager_1. at org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener.findConnectingAddress(ConnectionUtils.java:472) at org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener.findConnectingAddress(ConnectionUtils.java:361) at org.apache.flink.runtime.util.LeaderRetrievalUtils.findConnectingAddress(LeaderRetrievalUtils.java:162) ... 34 more Caused by: java.lang.Exception: Could not retrieve InetSocketAddress from Akka URL akka://flink/user/jobmanager_1 at org.apache.flink.runtime.akka.AkkaUtils$.getInetSocketAddressFromAkkaURL(AkkaUtils.scala:709) at org.apache.flink.runtime.akka.AkkaUtils.getInetSocketAddressFromAkkaURL(AkkaUtils.scala) at org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener.findConnectingAddress(ConnectionUtils.java:392) ... 36 more ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink hotfix_single Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5652 commit 6a105cbb194b87dec98224b985ee5ceb9239d492 Author: zentol <chesnay@...> Date: 2018-03-05T12:45:33Z [hotfix][tests] Do not use singleActorSystem in LocalFlinkMiniCluster Using a singleActorSystem rendered the returned client unusable. ---
[GitHub] flink pull request #5651: [FLINK-8889][tests] Do not override cluster config...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5651 [FLINK-8889][tests] Do not override cluster config values ## What is the purpose of the change This PR modifies the cluster configuration in {{MiniClusterResource}} or {{TestBaseUtils}} to not categorically override several options. They are now only set if they weren't previously configured (i.e. by the test itself). ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8889 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5651.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5651 commit dafff1946e20e6f3884398b705bbeb4f70462efa Author: zentol <chesnay@...> Date: 2018-03-07T09:18:03Z [FLINK-8889][tests] Do not override cluster config values ---
[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5648#discussion_r172775779 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -404,16 +404,25 @@ public void start() throws Exception { final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); if (jobManagerRunner != null) { - return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); - } else { - final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId); - - if (jobDetails != null) { - return CompletableFuture.completedFuture(jobDetails.getStatus()); - } else { - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + try { + return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); --- End diff -- This is an asynchronous call that isn't throwing the exception. You have to add a handler to the returned `CompletableFuture`. It also only properly resolves one of the exceptions, and IMO shouldn't catch `Exception` but the specific exceptions we want the workaround to work for as to not hide other issues. In any case, I'm not sure if adding workarounds to the Dispatcher is the right way to go. These issues revealed that some scenarios are not properly handled, and I would prefer waiting for @tillrohrmann to really fix this in the Dispatcher and related components. We can temporarily handle both exceptions in the `MiniClusterClient` by adding a *single* retry (with a short sleep) if a **specific** exception occurs. ---
[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5573#discussion_r172756524 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto }); } + @Override + public Map<String, Object> getAccumulators(final JobID jobID) throws Exception { + final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance(); + final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters(); + accMsgParams.jobPathParameter.resolve(jobID); + accMsgParams.queryParameter.resolve(Collections.singletonList(true)); + + CompletableFuture responseFuture = sendRequest( + accumulatorsHeaders, + accMsgParams + ); + + return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> { + if (accumulatorsInfo != null && accumulatorsInfo.getSerializedUserAccumulators() != null) { + return accumulatorsInfo.getSerializedUserAccumulators(); --- End diff -- the accumulators should be deserialized via `SerializedValue#deserialize(ClassLoader)` . If `Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader)` (that also should be overridden) was called use the passed in `ClassLoader`, otherwise `ClassLoader.getSystemClassLoader()`. ---
[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5573#discussion_r172755830 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto }); } + @Override + public Map<String, Object> getAccumulators(final JobID jobID) throws Exception { --- End diff -- we should also override `Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader)` ---
[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5573#discussion_r172756640 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java --- @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collection; +import java.util.Collections; + +/** + * request parameter for job accumulator's handler {@link org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}. + */ +public class JobAccumulatorsMessageParameters extends JobMessageParameters { + + public final AccumulatorsIncludeSerializedValueQueryParameter queryParameter = new AccumulatorsIncludeSerializedValueQueryParameter(); --- End diff -- field name is a bit generic; how about `includeSerializedAccumulators`? ---
[GitHub] flink issue #5637: [FLINK-8860][flip6] stop SlotManager spamming logs for ev...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5637 I would go for `TRACE` given how often we set the log level to `DEBUG`. ---
[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5573 We may want to delay merging this until [FLINK-8881](https://issues.apache.org/jira/browse/FLINK-8881) has been addressed, as it voids the primary use-case of this handler. ---