[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392822#comment-16392822 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5645 Manually merged > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392823#comment-16392823 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/5645 > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389934#comment-16389934 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5645 Merging this after fixing the comment... > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389933#comment-16389933 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172939401 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,9 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + // we do not have concurrency checks here, because serialize() and + // deserialize() do the checks and the current mechanism does not handle --- End diff -- oh, right, fixing... > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389855#comment-16389855 ] ASF GitHub Bot commented on FLINK-8876: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172922843 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,9 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + // we do not have concurrency checks here, because serialize() and + // deserialize() do the checks and the current mechanism does not handle --- End diff -- looks like the end of the comment is missing > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389793#comment-16389793 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172911926 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,19 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + if (CONCURRENT_ACCESS_CHECK) { + enterExclusiveThread(); + } + + try { + T value = deserialize(source); --- End diff -- An alternative would be to do the following, but I would honestly not do that. This is a heuristic anyways (otherwise we would need to use CAS on the `currentThread` field). ```java private void enterExclusiveThread() { Thread previous = currentThread; Thread thisThread = Thread.currentThread(); if (previous == null) { reEntranceDepth = 1; currentThread = thisThread; } else if (previous == thisThread) { reEntranceDepth++; } else { throw new IllegalStateException( "Concurrent access to KryoSerializer. Thread 1: " + thisThread.getName() + " , Thread 2: " + previous.getName()); } } private void exitExclusiveThread() { if (--reEntranceDepth == 0) { currentThread = null; } } ``` > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389779#comment-16389779 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5645 @zentol Have a look at the way I solved that above, see if you agree that we covered our bases now. > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389777#comment-16389777 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172907369 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,19 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + if (CONCURRENT_ACCESS_CHECK) { + enterExclusiveThread(); + } + + try { + T value = deserialize(source); --- End diff -- I think we can simply remove the check around the `copy(DataInputView source, DataOutputView target)` because the respective critical parts (`serialize()` and `deserialize()`) are covered already. > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389768#comment-16389768 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5645 I added a way to reset the flag into its original state (prior to assertion activation) and used that to test that the concurrency check is not active by default. The `KryoSerializerConcurrencyTest` tests the default test setting (concurrency checks on), the `KryoSerializerConcurrencyCheckInactiveITCase` tests the general default (concurrency checks off). > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389739#comment-16389739 ] ASF GitHub Bot commented on FLINK-8876: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172893030 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,19 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + if (CONCURRENT_ACCESS_CHECK) { + enterExclusiveThread(); + } + + try { + T value = deserialize(source); --- End diff -- Have to point out that after `deserialize()` the checks in copy() are ineffective as the `currentThread` field has already been nulled. In other words, we guard against concurrent access before deserialize(), and within deserialize()(), but not between deserialize()() or after serialize(). This isn't a _problem_ as all code is actually covered, but we may want to document that. > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389659#comment-16389659 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5645 Ah, I see. Let me think whether there is an easy way to do that... > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388266#comment-16388266 ] ASF GitHub Bot commented on FLINK-8876: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5645 The end-to-end test would be cheap as we wouldn't execute a flink job or even start a flink cluster. The scripts in `flink-end-to-end-tests` can do pretty much anything they want; my idea was to just call the test method in a non-test context, in which case the test should throw an AssertionError. It may not be the cleanest thing to do though as we have to max test and production code in a single jar. My goal was to verify that the sections guarded by `CONCURRENT_ACCESS_CHECK` are skipped at runtime by default. As it stands someone could just hard-code it to `true` and there's no test preventing that. > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388049#comment-16388049 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5645 Making the concurrent access check re-entrant fixes all tests. My feeling would be to not add an end-to-end test for this, because end-to-end tests are quite expensive. Is this mainly for validating that this works by configuring the log level in your opinion? > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387845#comment-16387845 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5645 Thanks for the great review. Good catch, I bet the missing check against the current thread is the reason for the test failure. > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387730#comment-16387730 ] ASF GitHub Bot commented on FLINK-8876: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172510349 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -277,6 +349,26 @@ private void initializeAvro() { this.decoder = new DataInputDecoder(); } + // + // Concurrency checks + // + + private void enterExclusiveThread() { + // we use simple get, check, set here, rather than CAS + // we don't need lock-style correctness, this is only a sanity-check and we thus + // favor speed at the cost of some false negatives in this check + Thread previous = currentThread; + if (previous == null) { + currentThread = Thread.currentThread(); + } else { --- End diff -- same as for kroy > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387729#comment-16387729 ] ASF GitHub Bot commented on FLINK-8876: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172509993 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -502,6 +572,22 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE // For testing // + private void enterExclusiveThread() { + // we use simple get, check, set here, rather than CAS + // we don't need lock-style correctness, this is only a sanity-check and we thus + // favor speed at the cost of some false negatives in this check + Thread previous = currentThread; + if (previous == null) { + currentThread = Thread.currentThread(); + } else { + throw new IllegalStateException("Serializer already accessed by thread " + previous.getName()); --- End diff -- you also have to check for equality between previous and Thread.currentThread, as copy() also calls serialize/deserialize(). > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387720#comment-16387720 ] ASF GitHub Bot commented on FLINK-8876: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172508524 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -502,6 +572,22 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE // For testing // + private void enterExclusiveThread() { + // we use simple get, check, set here, rather than CAS + // we don't need lock-style correctness, this is only a sanity-check and we thus + // favor speed at the cost of some false negatives in this check + Thread previous = currentThread; + if (previous == null) { + currentThread = Thread.currentThread(); + } else { + throw new IllegalStateException("Serializer already accessed by thread " + previous.getName()); --- End diff -- We can also print the current thread for debugging purposes. > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387708#comment-16387708 ] ASF GitHub Bot commented on FLINK-8876: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5645 We got a bunch of failing tests: ``` Failed tests: MultidimensionalArraySerializerTest.testObjectArrays:84 Exception in test: Serializer already accessed by thread main PojoGenericTypeSerializerTest>AbstractGenericTypeSerializerTest.testBeanStyleObjects:95->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main PojoGenericTypeSerializerTest>AbstractGenericTypeSerializerTest.testCompositeObject:75->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main PojoGenericTypeSerializerTest>AbstractGenericTypeSerializerTest.testNestedInterfaces:124->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main PojoGenericTypeSerializerTest>AbstractGenericTypeSerializerTest.testNestedObjects:85->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main PojoGenericTypeSerializerTest>AbstractGenericTypeSerializerTest.testSimpleTypesObjects:64->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main PojoSubclassSerializerTest>SerializerTestBase.testSerializedCopyAsSequence:402 Exception in test: Serializer already accessed by thread main PojoSubclassSerializerTest>SerializerTestBase.testSerializedCopyIndividually:364 Exception in test: Serializer already accessed by thread main SubclassFromInterfaceSerializerTest>SerializerTestBase.testSerializedCopyAsSequence:402 Exception in test: Serializer already accessed by thread main SubclassFromInterfaceSerializerTest>SerializerTestBase.testSerializedCopyIndividually:364 Exception in test: Serializer already accessed by thread main TupleSerializerTest.testTuple5CustomObjects:215->runTests:229 Exception in test: Serializer already accessed by thread main KryoGenericArraySerializerTest>AbstractGenericArraySerializerTest.testBeanStyleObjects:120->AbstractGenericArraySerializerTest.runTests:152->AbstractGenericArraySerializerTest.runTests:173 Exception in test: Serializer already accessed by thread main KryoGenericArraySerializerTest>AbstractGenericArraySerializerTest.testCompositeObject:93->AbstractGenericArraySerializerTest.runTests:152->AbstractGenericArraySerializerTest.runTests:173 Exception in test: Serializer already accessed by thread main KryoGenericArraySerializerTest>AbstractGenericArraySerializerTest.testNestedObjects:103->AbstractGenericArraySerializerTest.runTests:152->AbstractGenericArraySerializerTest.runTests:173 Exception in test: Serializer already accessed by thread main KryoGenericArraySerializerTest>AbstractGenericArraySerializerTest.testSimpleTypesObjects:82->AbstractGenericArraySerializerTest.runTests:152->AbstractGenericArraySerializerTest.runTests:173 Exception in test: Serializer already accessed by thread main KryoGenericArraySerializerTest>AbstractGenericArraySerializerTest.testString:63->AbstractGenericArraySerializerTest.runTests:152->AbstractGenericArraySerializerTest.runTests:173 Exception in test: Serializer already accessed by thread main KryoGenericTypeSerializerTest>AbstractGenericTypeSerializerTest.testBeanStyleObjects:95->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main KryoGenericTypeSerializerTest>AbstractGenericTypeSerializerTest.testCompositeObject:75->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main KryoGenericTypeSerializerTest.testJavaDequeue:68->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main KryoGenericTypeSerializerTest.testJavaList:50->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main KryoGenericTypeSerializerTest.testJavaSet:59->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main KryoGenericTypeSerializerTest>AbstractGenericTypeSerializerTest.testNestedInterfaces:124->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main KryoGenericTypeSerializerTest>AbstractGenericTypeSerializerTest.testNestedObjects:85->AbstractGenericTypeSerializerTest.runTests:155 Exception in test: Serializer already accessed by thread main
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387660#comment-16387660 ] ASF GitHub Bot commented on FLINK-8876: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5645 FLINK-8876 Improve concurrent access handling in stateful serializers ## What is the purpose of the change Help detecting accidental concurrent use of serializers. If the log is set to `DEBUG`, or assertions are enabled (as they are during tests), the serializers will remember the current thread that performs serialization/deserialization/copying and throw an error if another thread accesses the serializer at the same time. The code that implements that check is under a `static final` flag, so should be eliminated by the JIT if the utility is not active, meaning it has zero cost when not used. To initialize the flag, we use the logger (using the fact that Java initializes static fields in order) and a utility class (using the fact that the utility class is initialized before the serializer class.) ## Brief change log - Add a concurrency detector to the KryoSerializer - Add a concurrency detector to the AvroSerializer ## Verifying this change The change adds unit tests to validate the effectiveness of the utility. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no)** - The serializers: **(yes** / no / don't know) - The runtime per-record code paths (performance sensitive): **(yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? **(yes** / no) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) There is no change that users should make, this simply enhances test coverage and results in more meaningful errors during debug runs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink kryo_safety Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5645.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5645 commit 4a99516ca938bf4cf6dfcdbd0e5e4615367c8eac Author: Stephan EwenDate: 2018-03-04T11:11:29Z [FLINK-8877] [core] Set Kryo trace if Flink log level is TRACE commit 3f20d2098de39997b89b79366a636c4dde77dc6a Author: Stephan Ewen Date: 2018-03-06T10:30:54Z [FLINK-8878] [tests] Add BlockerSync utility This helps to synchronize two threads of which one is expected to block while holding a resource. commit 727003165447ee2a925b3030f19afe4e1b3b5325 Author: Stephan Ewen Date: 2018-03-04T11:20:17Z [FLINK-8878] [core] Add concurrency check Kryo Serializer on DEBUG level commit 714ee8882f86aa2a969dc842097dc11687d04607 Author: Stephan Ewen Date: 2018-03-06T10:21:08Z [FLINK-8879] [avro] Add concurrency check Avro Serializer on DEBUG level. > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)