[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-04-02 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5732


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176219997
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
 ---
@@ -121,13 +117,9 @@ public void testValueStateDescriptorAutoSerializer() 
throws Exception {
@SuppressWarnings("unchecked")
@Test
public void testSerializerDuplication() {
-   TypeSerializer statefulSerializer = 
mock(TypeSerializer.class);
-   when(statefulSerializer.duplicate()).thenAnswer(new 
Answer() {
-   @Override
-   public TypeSerializer answer(InvocationOnMock 
invocation) throws Throwable {
-   return mock(TypeSerializer.class);
-   }
-   });
+   // we need a serializer that actually duplicates for testing (a 
stateful one)
--- End diff --

Same comment as above.


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176220974
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -77,18 +80,22 @@
 
/** The serializer for the type. May be eagerly initialized in the 
constructor,
 * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+   @Nullable
protected TypeSerializer serializer;
 
+   /** The type information describing the value type. Only used to lazily 
create the serializer
--- End diff --

nit: I think this was also a copying error in the original comment, but 
this is not necessarily a "value", unless we simply thing of all state as a 
value, in which case I'm fine with this.


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176220075
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
 ---
@@ -129,23 +125,12 @@ public void testMapStateDescriptorAutoSerializer() 
throws Exception {
 * Tests that the returned serializer is duplicated. This allows to
 * share the state descriptor.
 */
-   @SuppressWarnings("unchecked")
@Test
public void testSerializerDuplication() {
-   TypeSerializer keySerializer = 
mock(TypeSerializer.class);
-   TypeSerializer valueSerializer = 
mock(TypeSerializer.class);
-   when(keySerializer.duplicate()).thenAnswer(new 
Answer() {
-   @Override
-   public TypeSerializer answer(InvocationOnMock 
invocation) throws Throwable {
-   return mock(TypeSerializer.class);
-   }
-   });
-   when(valueSerializer.duplicate()).thenAnswer(new 
Answer() {
-   @Override
-   public TypeSerializer answer(InvocationOnMock 
invocation) throws Throwable {
-   return mock(TypeSerializer.class);
-   }
-   });
+   // we need a serializer that actually duplicates for testing (a 
stateful one)
--- End diff --

See above


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176220108
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
 ---
@@ -118,17 +115,14 @@ public void testValueStateDescriptorAutoSerializer() 
throws Exception {
@SuppressWarnings("unchecked")
@Test
public void testSerializerDuplication() {
-   TypeSerializer statefulSerializer = 
mock(TypeSerializer.class);
-   when(statefulSerializer.duplicate()).thenAnswer(new 
Answer() {
-   @Override
-   public TypeSerializer answer(InvocationOnMock 
invocation) throws Throwable {
-   return mock(TypeSerializer.class);
-   }
-   });
-
-   ReduceFunction reducer = mock(ReduceFunction.class);
-
-   ReducingStateDescriptor descr = new 
ReducingStateDescriptor<>("foobar", reducer, statefulSerializer);
+   // we need a serializer that actually duplicates for testing (a 
stateful one)
--- End diff --

See above


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176220129
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
 ---
@@ -149,13 +145,9 @@ public void testVeryLargeDefaultValue() throws 
Exception {
@SuppressWarnings("unchecked")
@Test
public void testSerializerDuplication() {
-   TypeSerializer statefulSerializer = 
mock(TypeSerializer.class);
-   when(statefulSerializer.duplicate()).thenAnswer(new 
Answer() {
-   @Override
-   public TypeSerializer answer(InvocationOnMock 
invocation) throws Throwable {
-   return mock(TypeSerializer.class);
-   }
-   });
+   // we need a serializer that actually duplicates for testing (a 
stateful one)
--- End diff --

See above


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176222855
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
 ---
@@ -130,6 +135,31 @@ public void 
testInitializeSerializerAfterSerializationWithCustomConfig() throws
.getRegistration(File.class).getId() > 0);
}
 
+   // 

+   //  Tests for serializer initialization
+   // 

+
+   /**
+* FLINK-6775, tests that the returned serializer is duplicated.
+* This allows to share the state descriptor across threads.
+*/
+   @Test
+   public void testSerializerDuplication() throws Exception {
+   // we need a serializer that actually duplicates for testing (a 
stateful one)
--- End diff --

Same as above, we should assert that assumption


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176219956
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
 ---
@@ -41,16 +40,11 @@
 * Tests that the returned serializer is duplicated. This allows to
 * share the state descriptor.
 */
-   @SuppressWarnings("unchecked")
@Test
public void testSerializerDuplication() {
-   TypeSerializer serializer = mock(TypeSerializer.class);
-   when(serializer.duplicate()).thenAnswer(new 
Answer() {
-   @Override
-   public TypeSerializer answer(InvocationOnMock 
invocation) throws Throwable {
-   return mock(TypeSerializer.class);
-   }
-   });
+   // we need a serializer that actually duplicates for testing (a 
stateful one)
--- End diff --

Will this condition always hold? Should we maybe guard this assumption with 
an assertion, i.e. assert that the result of `serialiser.duplicate()` is 
different from the original serialiser?


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176221943
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -249,12 +257,13 @@ public boolean isSerializerInitialized() {
 */
public void initializeSerializerUnlessSet(ExecutionConfig 
executionConfig) {
--- End diff --

This is slightly orthogonal to this change, but: could we get rid of this 
method and instead change `getSerializer()` to 
`getSerializer(ExecutionConfig)`. That way, we don't have to be concerned about 
forgetting to call `initializeSerializerUnlessSet()`.


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176006260
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -77,18 +80,22 @@
 
/** The serializer for the type. May be eagerly initialized in the 
constructor,
 * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+   @Nullable
protected TypeSerializer serializer;
 
+   /** The type information describing the value type. Only used to lazily 
create the serializer
+* and dropped during serialization */
+   @Nullable
--- End diff --

good catch, will fix that upon merging


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r175983599
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -77,18 +80,22 @@
 
/** The serializer for the type. May be eagerly initialized in the 
constructor,
 * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+   @Nullable
protected TypeSerializer serializer;
 
+   /** The type information describing the value type. Only used to lazily 
create the serializer
+* and dropped during serialization */
+   @Nullable
--- End diff --

nit:Type information will not dropped during serialization now, it dropped 
in `initializeSerializerUnlessSet()`.


---


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-20 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5732

[FLINK-9034] [FLINK-9035] [core] Fix state descriptors

## What is the purpose of the change

Fixes two issue with the `StateDescriptors` that are used to obtain state 
access in transformation functions: 

### Broken Equals and hashCode

`equals()` and `hashCode()` depends on fields that are not always set and 
that may change during the life of a state descriptor. That is especially 
problematic, because the state descriptors are keys in a map, and if the 
meaning of `equals()` and `hashCode()` changes after insertion, the objects 
become keys that cannot be references / matched.

This pull request changes `equals()` and `hashCode()` to only take state 
name and descriptor type (by class) into account for hashCode and equality, 
which are always constant and not changing as part of serializer initialization.

**Illustration of the problem:**

The following code fails with a `NullPointerException`, because the 
`hashCode()` method tries to access the serializer field, which may be 
uninitialized at that point.

```java
ValueStateDescriptor descr = new ValueStateDescriptor<>("name", 
String.class);
descr.hashCode(); // exception
```

The equals() method is equally broken (no pun intended):
```java
ValueStateDescriptor a = new ValueStateDescriptor<>("name", 
String.class);
ValueStateDescriptor b = new ValueStateDescriptor<>("name", 
String.class);

a.equals(b) // exception
b.equals(a) // exception
a.initializeSerializerUnlessSet(new ExecutionConfig());
a.equals(b) // false
b.equals(a) // exception
b.initializeSerializerUnlessSet(new ExecutionConfig());
a.equals(b) // true
b.equals(a) // true
```

### Type Information dropped prematurely

The following code is currently problematic:
```java
public class MyFunction extends RichMapFunction  {

private final ValueStateDescriptor descr = new 
ValueStateDescriptor<>("state name", MyType.class);

private ValueState state;

@Override
public void open(Configuration cfg) {
state = getRuntimeContext().getValueState(descr);
}
}
```

The problem is that the state descriptor drops the type information and 
creates a serializer before serialization as part of shipping the function in 
the cluster. To do that, it initializes the serializer with an empty execution 
config, making serialization inconsistent.

This is mainly an artifact from the days when dropping the type information 
before shipping was necessary, because the type info was not serializable. It 
now is, and we can fix that bug.

## Verifying this change

**This change is sensitive, because it touches the structures that all 
users use to obtain access to persistent state.**

This change adds a series of Unit tests to validate the fixed behavior.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no) - **All changes should preserve full API 
compatibility.**
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know): 
**Touches the structures that give access to checkpointed state.**
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
fix_state_descriptors

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5732.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5732


commit 8ff1284d28a056b91d91607584fab2a55fbcc86c
Author: Stephan Ewen 
Date:   2018-03-20T14:15:08Z

[hotfix] [core] Fix checkstyle in 'org.apache.flink.api.common.state'

commit dc0df85e064ee45bcb0f83d21b00a1abc9359723
Author: Stephan Ewen 
Date:   2018-03-20T14:29:12Z

[hotfix] [core] Add missing serialVersionUID to MapStateDescriptor

commit c62e84414ff4715399bce35ac74fb1d94256c3ed
Author: Stephan Ewen 
Date:   2018-03-20T14:43:33Z

[hotfix] [core] Add @FunctionalInterface to