[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2584


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-06 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2584#discussion_r8653
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 ---
@@ -20,62 +20,49 @@
 
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
 
 /**
  * A state handle that contains stream state in a byte array.
  */
 public class ByteStreamStateHandle implements StreamStateHandle {
 
-   private static final long serialVersionUID = -5280226231200217594L;
+   private static final long serialVersionUID = -5280226231202517594L;
 
/**
-* the state data
+* The state data.
 */
protected final byte[] data;
 
/**
+* A unique name of by which this state handle is identified and 
compared. Like a filename, all
+* {@link ByteStreamStateHandle} with the exact same name must also 
have the exact same content in data.
+*/
+   protected final String handleName;
--- End diff --

The problem before was, that two ByteStreamStateHandle were considered 
equal + same hash code if their byte[] had the same content. However, like with 
files, it should be possible two have two handles with the same content that 
are not considered as equal (e.g. some tests created the same content for 
different handles and deduplicated some handles away because of this). From 
this point of view, the string mimics a "file name".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-06 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2584#discussion_r82221594
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
 ---
@@ -290,6 +290,7 @@ public static void 
serializeStreamStateHandle(StreamStateHandle stateHandle, Dat
} else if (stateHandle instanceof ByteStreamStateHandle) {
dos.writeByte(BYTE_STREAM_STATE_HANDLE);
ByteStreamStateHandle byteStreamStateHandle = 
(ByteStreamStateHandle) stateHandle;
+   dos.writeUTF(byteStreamStateHandle.getHandleName());
--- End diff --

I think so, too. It will be changed again soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-06 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2584#discussion_r82221108
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 ---
@@ -219,7 +218,7 @@ public void write(byte[] b, int off, int len) throws 
IOException {
 
@Override
public long getPos() throws IOException {
-   return outStream == null ? pos : outStream.getPos();
+   return pos + (outStream == null ? 0 : 
outStream.getPos());
--- End diff --

It was buggy before and I think there are in general no tests for this 
inner class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-06 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2584#discussion_r82216509
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -259,28 +263,29 @@ public void 
restorePartitionedState(List state) throws Exc
 
try {
 
-   fsDataInputStream = 
keyGroupsHandle.getStateHandle().openInputStream();
+   fsDataInputStream = 
keyGroupsHandle.openInputStream();

cancelStreamRegistry.registerClosable(fsDataInputStream);
 
DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(fsDataInputStream);
 
int numKvStates = inView.readShort();
 
-   Map kvStatesById = new 
HashMap<>(numKvStates);
-
for (int i = 0; i < numKvStates; ++i) {
String stateName = inView.readUTF();
 
-   TypeSerializer namespaceSerializer =
+   TypeSerializer namespaceSerializer =

InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
-   TypeSerializer stateSerializer =
+   TypeSerializer stateSerializer =

InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
 
-   StateTable stateTable = new 
StateTable(stateSerializer,
-   namespaceSerializer,
-   keyGroupRange);
-   stateTables.put(stateName, stateTable);
-   kvStatesById.put(i, stateName);
+   StateTable stateTable = 
stateTables.get(stateName);
+
+   if (null == stateTable) {
--- End diff --

Let's add a comment here that it is important to check for previously 
restored state first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-06 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2584#discussion_r82213262
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 ---
@@ -663,28 +826,42 @@ public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws
public void restoreState(Integer state) throws Exception {
counter = state;
}
+   }
+
+   private static class PartitionedStateSource extends StateSourceBase 
implements ListCheckpointed {
+
+   private static final long serialVersionUID = 
-359715965103593462L;
+   private static final int NUM_PARTITIONS = 7;
+
+   private static int[] CHECK_CORRECT_SNAPSHOT;
+   private static int[] CHECK_CORRECT_RESTORE;
 
@Override
-   public void run(SourceContext ctx) throws Exception {
-   final Object lock = ctx.getCheckpointLock();
+   public List snapshotState(long checkpointId, long 
timestamp) throws Exception {
 
-   while (running) {
-   synchronized (lock) {
-   counter++;
+   
CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter;
 
-   ctx.collect(counter * 
getRuntimeContext().getIndexOfThisSubtask());
-   }
+   int div = counter / NUM_PARTITIONS;
+   int mod = counter % NUM_PARTITIONS;
 
-   Thread.sleep(2);
-   if(counter == 10) {
-   workStartedLatch.countDown();
+   List split = new ArrayList<>();
+   for (int i = 0; i < NUM_PARTITIONS; ++i) {
+   int partitionValue = div;
+   if (mod > 0) {
+   --mod;
+   ++partitionValue;
}
+   split.add(partitionValue);
}
+   return split;
}
 
@Override
-   public void cancel() {
-   running = false;
+   public void restoreState(List state) throws Exception {
+   for(Integer v : state) {
--- End diff --

Missing whitespace after `for`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-06 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2584#discussion_r82217757
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 ---
@@ -20,62 +20,49 @@
 
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
 
 /**
  * A state handle that contains stream state in a byte array.
  */
 public class ByteStreamStateHandle implements StreamStateHandle {
 
-   private static final long serialVersionUID = -5280226231200217594L;
+   private static final long serialVersionUID = -5280226231202517594L;
 
/**
-* the state data
+* The state data.
 */
protected final byte[] data;
 
/**
+* A unique name of by which this state handle is identified and 
compared. Like a filename, all
+* {@link ByteStreamStateHandle} with the exact same name must also 
have the exact same content in data.
+*/
+   protected final String handleName;
--- End diff --

Is this meant to be an optimization for `equals/hashCode`? If yes, has it 
been an issue before? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-06 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2584#discussion_r82211635
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 ---
@@ -219,7 +218,7 @@ public void write(byte[] b, int off, int len) throws 
IOException {
 
@Override
public long getPos() throws IOException {
-   return outStream == null ? pos : outStream.getPos();
+   return pos + (outStream == null ? 0 : 
outStream.getPos());
--- End diff --

This was buggy before then? Is there a test for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-06 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2584#discussion_r82211023
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 ---
@@ -297,7 +296,7 @@ public StreamStateHandle closeAndGetHandle() throws 
IOException {
if (outStream == null && pos <= 
localStateThreshold) {
closed = true;
byte[] bytes = 
Arrays.copyOf(writeBuffer, pos);
-   return new 
ByteStreamStateHandle(bytes);
+   return new 
ByteStreamStateHandle(String.valueOf(createStatePath()), bytes);
--- End diff --

Why don't we do `createStatePath().toString()`? We actually don't want a 
non-unique `"null"` String here in case that `createStatePath()` returns `null` 
(which it shouldn't anyways).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-06 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2584#discussion_r82211497
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
 ---
@@ -290,6 +290,7 @@ public static void 
serializeStreamStateHandle(StreamStateHandle stateHandle, Dat
} else if (stateHandle instanceof ByteStreamStateHandle) {
dos.writeByte(BYTE_STREAM_STATE_HANDLE);
ByteStreamStateHandle byteStreamStateHandle = 
(ByteStreamStateHandle) stateHandle;
+   dos.writeUTF(byteStreamStateHandle.getHandleName());
--- End diff --

I think it's OK that we don't change the format version (e.g. to v2) when 
we change the serialization format in a snapshot version right? Since 1.1 the 
version was already updated, which is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2584: [FLINK-4731] Bugfix for HeapKeyedStateBackend scal...

2016-10-04 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2584

[FLINK-4731] Bugfix for HeapKeyedStateBackend scale-in and additional tests 
in RescalingITCase

Restoring the HeapKeyedStateBackend was broken in case that parallelism is 
reduced. The restore method was overwriting previously restored state.

Added scale-in testing to the RescalingITCase to catch problems like this 
in the future.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink stream-keyed-state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2584.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2584


commit 12a981ee478ec6cd29d768c5e4166aad97e6e06b
Author: Stefan Richter 
Date:   2016-10-02T12:15:48Z

[hotfix] Make KeyGroupsStateHandle implement StreamStateHandle

commit 91f76867df790eba9fec793f4e276b7b05aebb6c
Author: Stefan Richter 
Date:   2016-10-02T09:30:59Z

[hotfix] Minor cleanup in FlinkKafkaConsumerBase and Test

commit a7614c1b583f220dcc78a8130494552bc085a13f
Author: Stefan Richter 
Date:   2016-10-02T12:19:37Z

[FLINK-4730] Introducing CheckpointMetaData

commit c29804d95e5baa52f6ce47a7a1546ec3797fa9d2
Author: Stefan Richter 
Date:   2016-10-02T14:56:41Z

[FLINK-4731] Bugfix for HeapKeyedStateBackend scale-in and additional tests 
in RescalingITCase




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---