[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-09 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5645


---


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172939401
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,9 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   // we do not have concurrency checks here, because serialize() 
and
+   // deserialize() do the checks and the current mechanism does 
not handle
--- End diff --

oh, right, fixing...


---


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172922843
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,9 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   // we do not have concurrency checks here, because serialize() 
and
+   // deserialize() do the checks and the current mechanism does 
not handle
--- End diff --

looks like the end of the comment is missing


---


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172911926
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,19 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   if (CONCURRENT_ACCESS_CHECK) {
+   enterExclusiveThread();
+   }
+
+   try {
+   T value = deserialize(source);
--- End diff --

An alternative would be to do the following, but I would honestly not do 
that. This is a heuristic anyways (otherwise we would need to use CAS on the 
`currentThread` field).

```java
private void enterExclusiveThread() {
Thread previous = currentThread;
Thread thisThread = Thread.currentThread();
if (previous == null) {
reEntranceDepth = 1;
currentThread = thisThread;
}
else if (previous == thisThread) {
reEntranceDepth++;
}
else {
throw new IllegalStateException(
"Concurrent access to KryoSerializer. Thread 1: 
" + thisThread.getName() +
" , Thread 2: " + 
previous.getName());
}
}

private void exitExclusiveThread() {
if (--reEntranceDepth == 0) {
currentThread = null;
}
}
```


---


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172907369
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,19 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   if (CONCURRENT_ACCESS_CHECK) {
+   enterExclusiveThread();
+   }
+
+   try {
+   T value = deserialize(source);
--- End diff --

I think we can simply remove the check around the `copy(DataInputView 
source, DataOutputView target)` because the respective critical parts 
(`serialize()` and `deserialize()`) are covered already.


---


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172893030
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,19 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   if (CONCURRENT_ACCESS_CHECK) {
+   enterExclusiveThread();
+   }
+
+   try {
+   T value = deserialize(source);
--- End diff --

Have to point out that after `deserialize()` the checks in copy() are 
ineffective as the `currentThread` field has already been nulled. In other 
words, we guard against concurrent access before deserialize(), and within 
deserialize()(), but not between deserialize()() or after 
serialize().

This isn't a _problem_ as all code is actually covered, but we may want to 
document that.


---


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172510349
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -277,6 +349,26 @@ private void initializeAvro() {
this.decoder = new DataInputDecoder();
}
 
+   // 

+   //  Concurrency checks
+   // 

+
+   private void enterExclusiveThread() {
+   // we use simple get, check, set here, rather than CAS
+   // we don't need lock-style correctness, this is only a 
sanity-check and we thus
+   // favor speed at the cost of some false negatives in this check
+   Thread previous = currentThread;
+   if (previous == null) {
+   currentThread = Thread.currentThread();
+   } else {
--- End diff --

same as for kroy


---


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172509993
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
@@ -502,6 +572,22 @@ private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundE
// For testing
// 

 
+   private void enterExclusiveThread() {
+   // we use simple get, check, set here, rather than CAS
+   // we don't need lock-style correctness, this is only a 
sanity-check and we thus
+   // favor speed at the cost of some false negatives in this check
+   Thread previous = currentThread;
+   if (previous == null) {
+   currentThread = Thread.currentThread();
+   } else {
+   throw new IllegalStateException("Serializer already 
accessed by thread " + previous.getName());
--- End diff --

you also have to check for equality between previous and 
Thread.currentThread, as copy() also calls serialize/deserialize().


---


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172508524
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
@@ -502,6 +572,22 @@ private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundE
// For testing
// 

 
+   private void enterExclusiveThread() {
+   // we use simple get, check, set here, rather than CAS
+   // we don't need lock-style correctness, this is only a 
sanity-check and we thus
+   // favor speed at the cost of some false negatives in this check
+   Thread previous = currentThread;
+   if (previous == null) {
+   currentThread = Thread.currentThread();
+   } else {
+   throw new IllegalStateException("Serializer already 
accessed by thread " + previous.getName());
--- End diff --

We can also print the current thread for debugging purposes.


---


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-06 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5645

FLINK-8876 Improve concurrent access handling in stateful serializers

## What is the purpose of the change

Help detecting accidental concurrent use of serializers.

If the log is set to `DEBUG`, or assertions are enabled (as they are during 
tests), the serializers will remember the current thread that performs 
serialization/deserialization/copying and throw an error if another thread 
accesses the serializer at the same time.

The code that implements that check is under a `static final` flag, so 
should be eliminated by the JIT if the utility is not active, meaning it has 
zero cost when not used. To initialize the flag, we use the logger (using the 
fact that Java initializes static fields in order) and a utility class (using 
the fact that the utility class is initialized before the serializer class.)

## Brief change log

  - Add a concurrency detector to the KryoSerializer
  - Add a concurrency detector to the AvroSerializer

## Verifying this change

The change adds unit tests to validate the effectiveness of the utility.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no)**
  - The serializers: **(yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): **(yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? **(yes** / no)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

There is no change that users should make, this simply enhances test 
coverage and results in more meaningful errors during debug runs.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink kryo_safety

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5645.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5645


commit 4a99516ca938bf4cf6dfcdbd0e5e4615367c8eac
Author: Stephan Ewen 
Date:   2018-03-04T11:11:29Z

[FLINK-8877] [core] Set Kryo trace if Flink log level is TRACE

commit 3f20d2098de39997b89b79366a636c4dde77dc6a
Author: Stephan Ewen 
Date:   2018-03-06T10:30:54Z

[FLINK-8878] [tests] Add BlockerSync utility

This helps to synchronize two threads of which one is expected to block
while holding a resource.

commit 727003165447ee2a925b3030f19afe4e1b3b5325
Author: Stephan Ewen 
Date:   2018-03-04T11:20:17Z

[FLINK-8878] [core] Add concurrency check Kryo Serializer on DEBUG level

commit 714ee8882f86aa2a969dc842097dc11687d04607
Author: Stephan Ewen 
Date:   2018-03-06T10:21:08Z

[FLINK-8879] [avro] Add concurrency check Avro Serializer on DEBUG level.




---