[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3143 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r97065423 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,91 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } - protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace( + int keyGroup, K key, N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); } - private void writeKeyGroup(int keyGroup) throws IOException { + private void writeKeyGroup( + int keyGroup, final DataOutputView keySerializationDateDataOutputView) + throws IOException { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } } - private void writeKey(K key) throws IOException { + private void writeKey( + K key, final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + //write key int beforeWrite = keySerializationStream.getPosition(); backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } - private void writeNameSpace(N namespace) throws IOException { + private void writeNameSpace( + N namespace, final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + int beforeWrite = keySerializationStream.getPosition(); namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } - private void
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r97059850 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,91 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } - protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace( + int keyGroup, K key, N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); } - private void writeKeyGroup(int keyGroup) throws IOException { + private void writeKeyGroup( + int keyGroup, final DataOutputView keySerializationDateDataOutputView) + throws IOException { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } } - private void writeKey(K key) throws IOException { + private void writeKey( + K key, final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + //write key int beforeWrite = keySerializationStream.getPosition(); backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } - private void writeNameSpace(N namespace) throws IOException { + private void writeNameSpace( + N namespace, final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + int beforeWrite = keySerializationStream.getPosition(); namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } - private void
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r97049360 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = --- End diff -- Please try to use the same style as the remainder of the project. I know that every programmer in the world has figured out the perfect code style and is very opinionated on that ;-) Keeping a coherent style in a project is worth a lot and worth compromising... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96638594 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } - protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, + N namespace, --- End diff -- it certainly helps - didn't see how to configure my IntelliJ style to automatically do that though :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96638710 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } - protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, + N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); } - private void writeKeyGroup(int keyGroup) throws IOException { + private void writeKeyGroup(int keyGroup, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } } - private void writeKey(K key) throws IOException { + private void writeKey(K key, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + //write key int beforeWrite = keySerializationStream.getPosition(); backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } - private void writeNameSpace(N namespace) throws IOException { + private void writeNameSpace(N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + int beforeWrite = keySerializationStream.getPosition(); namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } - private void writeLengthFrom(int fromPosition) throws
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96638478 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = --- End diff -- yes, I saw very long lines here and there but imho 120 is quite long and affects readability as well - so does wrapping lines to some extend... I will go up to 90 instead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96638451 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java --- @@ -242,6 +245,132 @@ public void testValueState() throws Exception { backend.dispose(); } + /** +* Tests {@link ValueState#value()} and {@link KvState#getSerializedValue(byte[])} +* accessing the state concurrently. They should not get in the way of each +* other. +*/ + @Test + @SuppressWarnings("unchecked") + public void testValueStateRace() throws Exception { + final AbstractKeyedStateBackend backend = + createKeyedBackend(IntSerializer.INSTANCE); + final Integer namespace = Integer.valueOf(1); + + final ValueStateDescriptor kvId = + new ValueStateDescriptor<>("id", String.class); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + final TypeSerializer keySerializer = IntSerializer.INSTANCE; + final TypeSerializer namespaceSerializer = + IntSerializer.INSTANCE; + final TypeSerializer valueSerializer = kvId.getSerializer(); + + final ValueState state = backend + .getPartitionedState(namespace, IntSerializer.INSTANCE, kvId); + + @SuppressWarnings("unchecked") + final KvState kvState = (KvState) state; + + /** +* 1) Test that ValueState#value() before and after +* KvState#getSerializedValue(byte[]) return the same value. +*/ + + // set some key and namespace + final int key1 = 1; + backend.setCurrentKey(key1); + kvState.setCurrentNamespace(2); + state.update("2"); + assertEquals("2", state.value()); + + // query another key and namespace + assertNull(getSerializedValue(kvState, 3, keySerializer, + namespace, IntSerializer.INSTANCE, + valueSerializer)); + + // the state should not have changed! + assertEquals("2", state.value()); + + // re-set values + kvState.setCurrentNamespace(namespace); + + /** +* 2) Test two threads concurrently using ValueState#value() and +* KvState#getSerializedValue(byte[]). +*/ + + // some modifications to the state + final int key2 = 10; + backend.setCurrentKey(key2); + assertNull(state.value()); + assertNull(getSerializedValue(kvState, key2, keySerializer, + namespace, namespaceSerializer, valueSerializer)); + state.update("1"); + + boolean getterSuccess; + final Throwable[] throwables = {null, null}; + + final Thread getter = new Thread("State getter") { --- End diff -- nice replacement - didn't know about that class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96613474 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } - protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, + N namespace, --- End diff -- Just my personal opinion, but indenting parameters differently than the method body helps readability (say two indentations for the parameter list). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96613631 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } - protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, + N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); } - private void writeKeyGroup(int keyGroup) throws IOException { + private void writeKeyGroup(int keyGroup, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } } - private void writeKey(K key) throws IOException { + private void writeKey(K key, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + //write key int beforeWrite = keySerializationStream.getPosition(); backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } - private void writeNameSpace(N namespace) throws IOException { + private void writeNameSpace(N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + int beforeWrite = keySerializationStream.getPosition(); namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } - private void writeLengthFrom(int fromPosition) throws
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96612773 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = --- End diff -- You don't need to wrap that heavily, most code uses 120 characters line lengths. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96612430 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java --- @@ -242,6 +245,132 @@ public void testValueState() throws Exception { backend.dispose(); } + /** +* Tests {@link ValueState#value()} and {@link KvState#getSerializedValue(byte[])} +* accessing the state concurrently. They should not get in the way of each +* other. +*/ + @Test + @SuppressWarnings("unchecked") + public void testValueStateRace() throws Exception { + final AbstractKeyedStateBackend backend = + createKeyedBackend(IntSerializer.INSTANCE); + final Integer namespace = Integer.valueOf(1); + + final ValueStateDescriptor kvId = + new ValueStateDescriptor<>("id", String.class); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + final TypeSerializer keySerializer = IntSerializer.INSTANCE; + final TypeSerializer namespaceSerializer = + IntSerializer.INSTANCE; + final TypeSerializer valueSerializer = kvId.getSerializer(); + + final ValueState state = backend + .getPartitionedState(namespace, IntSerializer.INSTANCE, kvId); + + @SuppressWarnings("unchecked") + final KvState kvState = (KvState) state; + + /** +* 1) Test that ValueState#value() before and after +* KvState#getSerializedValue(byte[]) return the same value. +*/ + + // set some key and namespace + final int key1 = 1; + backend.setCurrentKey(key1); + kvState.setCurrentNamespace(2); + state.update("2"); + assertEquals("2", state.value()); + + // query another key and namespace + assertNull(getSerializedValue(kvState, 3, keySerializer, + namespace, IntSerializer.INSTANCE, + valueSerializer)); + + // the state should not have changed! + assertEquals("2", state.value()); + + // re-set values + kvState.setCurrentNamespace(namespace); + + /** +* 2) Test two threads concurrently using ValueState#value() and +* KvState#getSerializedValue(byte[]). +*/ + + // some modifications to the state + final int key2 = 10; + backend.setCurrentKey(key2); + assertNull(state.value()); + assertNull(getSerializedValue(kvState, key2, keySerializer, + namespace, namespaceSerializer, valueSerializer)); + state.update("1"); + + boolean getterSuccess; + final Throwable[] throwables = {null, null}; + + final Thread getter = new Thread("State getter") { --- End diff -- How about using the `CheckedThread` to avoid the stuff with Throwable arrays, etc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96476317 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may this contain garbage --- End diff -- this -> thus? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3143 [FLINK-5530] fix race condition in AbstractRocksDBState#getSerializedValue `AbstractRocksDBState#getSerializedValue()` uses the same key serialisation stream as the ordinary state access methods but is called in parallel during state queries thus violating the assumption of only one thread accessing it. This may lead to either wrong results in queries or corrupt data while queries are executed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-5530 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3143.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3143 commit 7fa4c61a04cb96b94907e6ec3803b994d3c6643d Author: Nico KruberDate: 2017-01-17T16:38:29Z [FLINK-5530] fix race condition in AbstractRocksDBState#getSerializedValue AbstractRocksDBState#getSerializedValue() uses the same key serialisation stream as the ordinary state access methods but is called in parallel during state queries thus violating the assumption of only one thread accessing it. This may lead to either wrong results in queries or corrupt data while queries are executed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---