[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2584 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r8653 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java --- @@ -20,62 +20,49 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import java.io.IOException; -import java.io.Serializable; -import java.util.Arrays; /** * A state handle that contains stream state in a byte array. */ public class ByteStreamStateHandle implements StreamStateHandle { - private static final long serialVersionUID = -5280226231200217594L; + private static final long serialVersionUID = -5280226231202517594L; /** -* the state data +* The state data. */ protected final byte[] data; /** +* A unique name of by which this state handle is identified and compared. Like a filename, all +* {@link ByteStreamStateHandle} with the exact same name must also have the exact same content in data. +*/ + protected final String handleName; --- End diff -- The problem before was, that two ByteStreamStateHandle were considered equal + same hash code if their byte[] had the same content. However, like with files, it should be possible two have two handles with the same content that are not considered as equal (e.g. some tests created the same content for different handles and deduplicated some handles away because of this). From this point of view, the string mimics a "file name". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r82221594 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java --- @@ -290,6 +290,7 @@ public static void serializeStreamStateHandle(StreamStateHandle stateHandle, Dat } else if (stateHandle instanceof ByteStreamStateHandle) { dos.writeByte(BYTE_STREAM_STATE_HANDLE); ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle; + dos.writeUTF(byteStreamStateHandle.getHandleName()); --- End diff -- I think so, too. It will be changed again soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r82221108 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java --- @@ -219,7 +218,7 @@ public void write(byte[] b, int off, int len) throws IOException { @Override public long getPos() throws IOException { - return outStream == null ? pos : outStream.getPos(); + return pos + (outStream == null ? 0 : outStream.getPos()); --- End diff -- It was buggy before and I think there are in general no tests for this inner class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r82216509 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -259,28 +263,29 @@ public void restorePartitionedState(List state) throws Exc try { - fsDataInputStream = keyGroupsHandle.getStateHandle().openInputStream(); + fsDataInputStream = keyGroupsHandle.openInputStream(); cancelStreamRegistry.registerClosable(fsDataInputStream); DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream); int numKvStates = inView.readShort(); - MapkvStatesById = new HashMap<>(numKvStates); - for (int i = 0; i < numKvStates; ++i) { String stateName = inView.readUTF(); - TypeSerializer namespaceSerializer = + TypeSerializer namespaceSerializer = InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader); - TypeSerializer stateSerializer = + TypeSerializer stateSerializer = InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader); - StateTable stateTable = new StateTable(stateSerializer, - namespaceSerializer, - keyGroupRange); - stateTables.put(stateName, stateTable); - kvStatesById.put(i, stateName); + StateTable stateTable = stateTables.get(stateName); + + if (null == stateTable) { --- End diff -- Let's add a comment here that it is important to check for previously restored state first. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r82213262 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java --- @@ -663,28 +826,42 @@ public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws public void restoreState(Integer state) throws Exception { counter = state; } + } + + private static class PartitionedStateSource extends StateSourceBase implements ListCheckpointed { + + private static final long serialVersionUID = -359715965103593462L; + private static final int NUM_PARTITIONS = 7; + + private static int[] CHECK_CORRECT_SNAPSHOT; + private static int[] CHECK_CORRECT_RESTORE; @Override - public void run(SourceContext ctx) throws Exception { - final Object lock = ctx.getCheckpointLock(); + public List snapshotState(long checkpointId, long timestamp) throws Exception { - while (running) { - synchronized (lock) { - counter++; + CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter; - ctx.collect(counter * getRuntimeContext().getIndexOfThisSubtask()); - } + int div = counter / NUM_PARTITIONS; + int mod = counter % NUM_PARTITIONS; - Thread.sleep(2); - if(counter == 10) { - workStartedLatch.countDown(); + List split = new ArrayList<>(); + for (int i = 0; i < NUM_PARTITIONS; ++i) { + int partitionValue = div; + if (mod > 0) { + --mod; + ++partitionValue; } + split.add(partitionValue); } + return split; } @Override - public void cancel() { - running = false; + public void restoreState(List state) throws Exception { + for(Integer v : state) { --- End diff -- Missing whitespace after `for` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r82217757 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java --- @@ -20,62 +20,49 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import java.io.IOException; -import java.io.Serializable; -import java.util.Arrays; /** * A state handle that contains stream state in a byte array. */ public class ByteStreamStateHandle implements StreamStateHandle { - private static final long serialVersionUID = -5280226231200217594L; + private static final long serialVersionUID = -5280226231202517594L; /** -* the state data +* The state data. */ protected final byte[] data; /** +* A unique name of by which this state handle is identified and compared. Like a filename, all +* {@link ByteStreamStateHandle} with the exact same name must also have the exact same content in data. +*/ + protected final String handleName; --- End diff -- Is this meant to be an optimization for `equals/hashCode`? If yes, has it been an issue before? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r82211635 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java --- @@ -219,7 +218,7 @@ public void write(byte[] b, int off, int len) throws IOException { @Override public long getPos() throws IOException { - return outStream == null ? pos : outStream.getPos(); + return pos + (outStream == null ? 0 : outStream.getPos()); --- End diff -- This was buggy before then? Is there a test for this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r82211023 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java --- @@ -297,7 +296,7 @@ public StreamStateHandle closeAndGetHandle() throws IOException { if (outStream == null && pos <= localStateThreshold) { closed = true; byte[] bytes = Arrays.copyOf(writeBuffer, pos); - return new ByteStreamStateHandle(bytes); + return new ByteStreamStateHandle(String.valueOf(createStatePath()), bytes); --- End diff -- Why don't we do `createStatePath().toString()`? We actually don't want a non-unique `"null"` String here in case that `createStatePath()` returns `null` (which it shouldn't anyways). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r82211497 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java --- @@ -290,6 +290,7 @@ public static void serializeStreamStateHandle(StreamStateHandle stateHandle, Dat } else if (stateHandle instanceof ByteStreamStateHandle) { dos.writeByte(BYTE_STREAM_STATE_HANDLE); ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle; + dos.writeUTF(byteStreamStateHandle.getHandleName()); --- End diff -- I think it's OK that we don't change the format version (e.g. to v2) when we change the serialization format in a snapshot version right? Since 1.1 the version was already updated, which is fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2584 [FLINK-4731] Bugfix for HeapKeyedStateBackend scale-in and additional tests in RescalingITCase Restoring the HeapKeyedStateBackend was broken in case that parallelism is reduced. The restore method was overwriting previously restored state. Added scale-in testing to the RescalingITCase to catch problems like this in the future. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink stream-keyed-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2584.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2584 commit 12a981ee478ec6cd29d768c5e4166aad97e6e06b Author: Stefan RichterDate: 2016-10-02T12:15:48Z [hotfix] Make KeyGroupsStateHandle implement StreamStateHandle commit 91f76867df790eba9fec793f4e276b7b05aebb6c Author: Stefan Richter Date: 2016-10-02T09:30:59Z [hotfix] Minor cleanup in FlinkKafkaConsumerBase and Test commit a7614c1b583f220dcc78a8130494552bc085a13f Author: Stefan Richter Date: 2016-10-02T12:19:37Z [FLINK-4730] Introducing CheckpointMetaData commit c29804d95e5baa52f6ce47a7a1546ec3797fa9d2 Author: Stefan Richter Date: 2016-10-02T14:56:41Z [FLINK-4731] Bugfix for HeapKeyedStateBackend scale-in and additional tests in RescalingITCase --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---