[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3143


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-20 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r97065423
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -132,55 +132,91 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
 
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-   return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
+
+   // we cannot reuse the keySerializationStream member since this 
method
+   // is called concurrently to the other ones and it may thus 
contain garbage
+   ByteArrayOutputStreamWithPos tmpKeySerializationStream =
+   new ByteArrayOutputStreamWithPos(128);
+   DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView =
+   new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
+
+   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
+   tmpKeySerializationStream, 
tmpKeySerializationDateDataOutputView);
+
+   return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
 
}
 
protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
-   
writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), 
backend.getCurrentKey(), currentNamespace);
+   writeKeyWithGroupAndNamespace(
+   backend.getCurrentKeyGroupIndex(),
+   backend.getCurrentKey(),
+   currentNamespace,
+   this.keySerializationStream,
+   this.keySerializationDateDataOutputView);
}
 
-   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N 
namespace) throws IOException {
+   protected void writeKeyWithGroupAndNamespace(
+   int keyGroup, K key, N namespace,
+   final ByteArrayOutputStreamWithPos 
keySerializationStream,
+   final DataOutputView 
keySerializationDateDataOutputView) throws IOException {
+
keySerializationStream.reset();
-   writeKeyGroup(keyGroup);
-   writeKey(key);
-   writeNameSpace(namespace);
+   writeKeyGroup(keyGroup, keySerializationDateDataOutputView);
+   writeKey(key, keySerializationStream, 
keySerializationDateDataOutputView);
+   writeNameSpace(namespace, keySerializationStream, 
keySerializationDateDataOutputView);
}
 
-   private void writeKeyGroup(int keyGroup) throws IOException {
+   private void writeKeyGroup(
+   int keyGroup, final DataOutputView 
keySerializationDateDataOutputView)
+   throws IOException {
+
for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
keySerializationDateDataOutputView.writeByte(keyGroup 
>>> (i << 3));
}
}
 
-   private void writeKey(K key) throws IOException {
+   private void writeKey(
+   K key, final ByteArrayOutputStreamWithPos 
keySerializationStream,
+   final DataOutputView 
keySerializationDateDataOutputView) throws IOException {
+
//write key
int beforeWrite = keySerializationStream.getPosition();
backend.getKeySerializer().serialize(key, 
keySerializationDateDataOutputView);
 
if (ambiguousKeyPossible) {
//write size of key
-   writeLengthFrom(beforeWrite);
+   writeLengthFrom(beforeWrite, keySerializationStream,
+   keySerializationDateDataOutputView);
}
}
 
-   private void writeNameSpace(N namespace) throws IOException {
+   private void writeNameSpace(
+   N namespace, final ByteArrayOutputStreamWithPos 
keySerializationStream,
+   final DataOutputView 
keySerializationDateDataOutputView) throws IOException {
+
int beforeWrite = keySerializationStream.getPosition();
namespaceSerializer.serialize(namespace, 
keySerializationDateDataOutputView);
 
if (ambiguousKeyPossible) {
//write length of namespace
-   writeLengthFrom(beforeWrite);
+   writeLengthFrom(beforeWrite, keySerializationStream,
+   keySerializationDateDataOutputView);
}
}
 
-   private void 

[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-20 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r97059850
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -132,55 +132,91 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
 
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-   return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
+
+   // we cannot reuse the keySerializationStream member since this 
method
+   // is called concurrently to the other ones and it may thus 
contain garbage
+   ByteArrayOutputStreamWithPos tmpKeySerializationStream =
+   new ByteArrayOutputStreamWithPos(128);
+   DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView =
+   new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
+
+   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
+   tmpKeySerializationStream, 
tmpKeySerializationDateDataOutputView);
+
+   return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
 
}
 
protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
-   
writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), 
backend.getCurrentKey(), currentNamespace);
+   writeKeyWithGroupAndNamespace(
+   backend.getCurrentKeyGroupIndex(),
+   backend.getCurrentKey(),
+   currentNamespace,
+   this.keySerializationStream,
+   this.keySerializationDateDataOutputView);
}
 
-   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N 
namespace) throws IOException {
+   protected void writeKeyWithGroupAndNamespace(
+   int keyGroup, K key, N namespace,
+   final ByteArrayOutputStreamWithPos 
keySerializationStream,
+   final DataOutputView 
keySerializationDateDataOutputView) throws IOException {
+
keySerializationStream.reset();
-   writeKeyGroup(keyGroup);
-   writeKey(key);
-   writeNameSpace(namespace);
+   writeKeyGroup(keyGroup, keySerializationDateDataOutputView);
+   writeKey(key, keySerializationStream, 
keySerializationDateDataOutputView);
+   writeNameSpace(namespace, keySerializationStream, 
keySerializationDateDataOutputView);
}
 
-   private void writeKeyGroup(int keyGroup) throws IOException {
+   private void writeKeyGroup(
+   int keyGroup, final DataOutputView 
keySerializationDateDataOutputView)
+   throws IOException {
+
for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
keySerializationDateDataOutputView.writeByte(keyGroup 
>>> (i << 3));
}
}
 
-   private void writeKey(K key) throws IOException {
+   private void writeKey(
+   K key, final ByteArrayOutputStreamWithPos 
keySerializationStream,
+   final DataOutputView 
keySerializationDateDataOutputView) throws IOException {
+
//write key
int beforeWrite = keySerializationStream.getPosition();
backend.getKeySerializer().serialize(key, 
keySerializationDateDataOutputView);
 
if (ambiguousKeyPossible) {
//write size of key
-   writeLengthFrom(beforeWrite);
+   writeLengthFrom(beforeWrite, keySerializationStream,
+   keySerializationDateDataOutputView);
}
}
 
-   private void writeNameSpace(N namespace) throws IOException {
+   private void writeNameSpace(
+   N namespace, final ByteArrayOutputStreamWithPos 
keySerializationStream,
+   final DataOutputView 
keySerializationDateDataOutputView) throws IOException {
+
int beforeWrite = keySerializationStream.getPosition();
namespaceSerializer.serialize(namespace, 
keySerializationDateDataOutputView);
 
if (ambiguousKeyPossible) {
//write length of namespace
-   writeLengthFrom(beforeWrite);
+   writeLengthFrom(beforeWrite, keySerializationStream,
+   keySerializationDateDataOutputView);
}
}
 
-   private void 

[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-20 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r97049360
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
 
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-   return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
+
+   // we cannot reuse the keySerializationStream member since this 
method
+   // is called concurrently to the other ones and it may thus 
contain garbage
+   ByteArrayOutputStreamWithPos tmpKeySerializationStream =
--- End diff --

Please try to use the same style as the remainder of the project. I know 
that every programmer in the world has figured out the perfect code style and 
is very opinionated on that ;-)

Keeping a coherent style in a project is worth a lot and worth 
compromising...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r96638594
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
 
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-   return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
+
+   // we cannot reuse the keySerializationStream member since this 
method
+   // is called concurrently to the other ones and it may thus 
contain garbage
+   ByteArrayOutputStreamWithPos tmpKeySerializationStream =
+   new ByteArrayOutputStreamWithPos(128);
+   DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView =
+   new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
+
+   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
+   tmpKeySerializationStream, 
tmpKeySerializationDateDataOutputView);
+
+   return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
 
}
 
protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
-   
writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), 
backend.getCurrentKey(), currentNamespace);
+   writeKeyWithGroupAndNamespace(
+   backend.getCurrentKeyGroupIndex(),
+   backend.getCurrentKey(),
+   currentNamespace,
+   this.keySerializationStream,
+   this.keySerializationDateDataOutputView);
}
 
-   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N 
namespace) throws IOException {
+   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key,
+   N namespace,
--- End diff --

it certainly helps - didn't see how to configure my IntelliJ style to 
automatically do that though :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r96638710
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
 
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-   return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
+
+   // we cannot reuse the keySerializationStream member since this 
method
+   // is called concurrently to the other ones and it may thus 
contain garbage
+   ByteArrayOutputStreamWithPos tmpKeySerializationStream =
+   new ByteArrayOutputStreamWithPos(128);
+   DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView =
+   new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
+
+   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
+   tmpKeySerializationStream, 
tmpKeySerializationDateDataOutputView);
+
+   return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
 
}
 
protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
-   
writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), 
backend.getCurrentKey(), currentNamespace);
+   writeKeyWithGroupAndNamespace(
+   backend.getCurrentKeyGroupIndex(),
+   backend.getCurrentKey(),
+   currentNamespace,
+   this.keySerializationStream,
+   this.keySerializationDateDataOutputView);
}
 
-   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N 
namespace) throws IOException {
+   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key,
+   N namespace,
+   final ByteArrayOutputStreamWithPos keySerializationStream,
+   final DataOutputView keySerializationDateDataOutputView) throws
+   IOException {
+
keySerializationStream.reset();
-   writeKeyGroup(keyGroup);
-   writeKey(key);
-   writeNameSpace(namespace);
+   writeKeyGroup(keyGroup, keySerializationDateDataOutputView);
+   writeKey(key, keySerializationStream, 
keySerializationDateDataOutputView);
+   writeNameSpace(namespace, keySerializationStream, 
keySerializationDateDataOutputView);
}
 
-   private void writeKeyGroup(int keyGroup) throws IOException {
+   private void writeKeyGroup(int keyGroup,
+   final DataOutputView keySerializationDateDataOutputView) throws
+   IOException {
+
for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
keySerializationDateDataOutputView.writeByte(keyGroup 
>>> (i << 3));
}
}
 
-   private void writeKey(K key) throws IOException {
+   private void writeKey(K key,
+   final ByteArrayOutputStreamWithPos keySerializationStream,
+   final DataOutputView keySerializationDateDataOutputView) throws
+   IOException {
+
//write key
int beforeWrite = keySerializationStream.getPosition();
backend.getKeySerializer().serialize(key, 
keySerializationDateDataOutputView);
 
if (ambiguousKeyPossible) {
//write size of key
-   writeLengthFrom(beforeWrite);
+   writeLengthFrom(beforeWrite, keySerializationStream,
+   keySerializationDateDataOutputView);
}
}
 
-   private void writeNameSpace(N namespace) throws IOException {
+   private void writeNameSpace(N namespace,
+   final ByteArrayOutputStreamWithPos keySerializationStream,
+   final DataOutputView keySerializationDateDataOutputView) throws
+   IOException {
+
int beforeWrite = keySerializationStream.getPosition();
namespaceSerializer.serialize(namespace, 
keySerializationDateDataOutputView);
 
if (ambiguousKeyPossible) {
//write length of namespace
-   writeLengthFrom(beforeWrite);
+   writeLengthFrom(beforeWrite, keySerializationStream,
+   keySerializationDateDataOutputView);
}
}
 
-   private void writeLengthFrom(int fromPosition) throws 

[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r96638478
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
 
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-   return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
+
+   // we cannot reuse the keySerializationStream member since this 
method
+   // is called concurrently to the other ones and it may thus 
contain garbage
+   ByteArrayOutputStreamWithPos tmpKeySerializationStream =
--- End diff --

yes, I saw very long lines here and there but imho 120 is quite long and 
affects readability as well - so does wrapping lines to some extend... I will 
go up to 90 instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r96638451
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -242,6 +245,132 @@ public void testValueState() throws Exception {
backend.dispose();
}
 
+   /**
+* Tests {@link ValueState#value()} and {@link 
KvState#getSerializedValue(byte[])}
+* accessing the state concurrently. They should not get in the way of 
each
+* other.
+*/
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testValueStateRace() throws Exception {
+   final AbstractKeyedStateBackend backend =
+   createKeyedBackend(IntSerializer.INSTANCE);
+   final Integer namespace = Integer.valueOf(1);
+
+   final ValueStateDescriptor kvId =
+   new ValueStateDescriptor<>("id", String.class);
+   kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+   final TypeSerializer keySerializer = 
IntSerializer.INSTANCE;
+   final TypeSerializer namespaceSerializer =
+   IntSerializer.INSTANCE;
+   final TypeSerializer valueSerializer = 
kvId.getSerializer();
+
+   final ValueState state = backend
+   .getPartitionedState(namespace, IntSerializer.INSTANCE, 
kvId);
+
+   @SuppressWarnings("unchecked")
+   final KvState kvState = (KvState) state;
+
+   /**
+* 1) Test that ValueState#value() before and after
+* KvState#getSerializedValue(byte[]) return the same value.
+*/
+
+   // set some key and namespace
+   final int key1 = 1;
+   backend.setCurrentKey(key1);
+   kvState.setCurrentNamespace(2);
+   state.update("2");
+   assertEquals("2", state.value());
+
+   // query another key and namespace
+   assertNull(getSerializedValue(kvState, 3, keySerializer,
+   namespace, IntSerializer.INSTANCE,
+   valueSerializer));
+
+   // the state should not have changed!
+   assertEquals("2", state.value());
+
+   // re-set values
+   kvState.setCurrentNamespace(namespace);
+
+   /**
+* 2) Test two threads concurrently using ValueState#value() and
+* KvState#getSerializedValue(byte[]).
+*/
+
+   // some modifications to the state
+   final int key2 = 10;
+   backend.setCurrentKey(key2);
+   assertNull(state.value());
+   assertNull(getSerializedValue(kvState, key2, keySerializer,
+   namespace, namespaceSerializer, valueSerializer));
+   state.update("1");
+
+   boolean getterSuccess;
+   final Throwable[] throwables = {null, null};
+
+   final Thread getter = new Thread("State getter") {
--- End diff --

nice replacement - didn't know about that class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r96613474
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
 
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-   return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
+
+   // we cannot reuse the keySerializationStream member since this 
method
+   // is called concurrently to the other ones and it may thus 
contain garbage
+   ByteArrayOutputStreamWithPos tmpKeySerializationStream =
+   new ByteArrayOutputStreamWithPos(128);
+   DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView =
+   new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
+
+   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
+   tmpKeySerializationStream, 
tmpKeySerializationDateDataOutputView);
+
+   return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
 
}
 
protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
-   
writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), 
backend.getCurrentKey(), currentNamespace);
+   writeKeyWithGroupAndNamespace(
+   backend.getCurrentKeyGroupIndex(),
+   backend.getCurrentKey(),
+   currentNamespace,
+   this.keySerializationStream,
+   this.keySerializationDateDataOutputView);
}
 
-   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N 
namespace) throws IOException {
+   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key,
+   N namespace,
--- End diff --

Just my personal opinion, but indenting parameters differently than the 
method body helps readability (say two indentations for the parameter list).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r96613631
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
 
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-   return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
+
+   // we cannot reuse the keySerializationStream member since this 
method
+   // is called concurrently to the other ones and it may thus 
contain garbage
+   ByteArrayOutputStreamWithPos tmpKeySerializationStream =
+   new ByteArrayOutputStreamWithPos(128);
+   DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView =
+   new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
+
+   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
+   tmpKeySerializationStream, 
tmpKeySerializationDateDataOutputView);
+
+   return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
 
}
 
protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
-   
writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), 
backend.getCurrentKey(), currentNamespace);
+   writeKeyWithGroupAndNamespace(
+   backend.getCurrentKeyGroupIndex(),
+   backend.getCurrentKey(),
+   currentNamespace,
+   this.keySerializationStream,
+   this.keySerializationDateDataOutputView);
}
 
-   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N 
namespace) throws IOException {
+   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key,
+   N namespace,
+   final ByteArrayOutputStreamWithPos keySerializationStream,
+   final DataOutputView keySerializationDateDataOutputView) throws
+   IOException {
+
keySerializationStream.reset();
-   writeKeyGroup(keyGroup);
-   writeKey(key);
-   writeNameSpace(namespace);
+   writeKeyGroup(keyGroup, keySerializationDateDataOutputView);
+   writeKey(key, keySerializationStream, 
keySerializationDateDataOutputView);
+   writeNameSpace(namespace, keySerializationStream, 
keySerializationDateDataOutputView);
}
 
-   private void writeKeyGroup(int keyGroup) throws IOException {
+   private void writeKeyGroup(int keyGroup,
+   final DataOutputView keySerializationDateDataOutputView) throws
+   IOException {
+
for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
keySerializationDateDataOutputView.writeByte(keyGroup 
>>> (i << 3));
}
}
 
-   private void writeKey(K key) throws IOException {
+   private void writeKey(K key,
+   final ByteArrayOutputStreamWithPos keySerializationStream,
+   final DataOutputView keySerializationDateDataOutputView) throws
+   IOException {
+
//write key
int beforeWrite = keySerializationStream.getPosition();
backend.getKeySerializer().serialize(key, 
keySerializationDateDataOutputView);
 
if (ambiguousKeyPossible) {
//write size of key
-   writeLengthFrom(beforeWrite);
+   writeLengthFrom(beforeWrite, keySerializationStream,
+   keySerializationDateDataOutputView);
}
}
 
-   private void writeNameSpace(N namespace) throws IOException {
+   private void writeNameSpace(N namespace,
+   final ByteArrayOutputStreamWithPos keySerializationStream,
+   final DataOutputView keySerializationDateDataOutputView) throws
+   IOException {
+
int beforeWrite = keySerializationStream.getPosition();
namespaceSerializer.serialize(namespace, 
keySerializationDateDataOutputView);
 
if (ambiguousKeyPossible) {
//write length of namespace
-   writeLengthFrom(beforeWrite);
+   writeLengthFrom(beforeWrite, keySerializationStream,
+   keySerializationDateDataOutputView);
}
}
 
-   private void writeLengthFrom(int fromPosition) throws 

[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r96612773
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
 
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-   return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
+
+   // we cannot reuse the keySerializationStream member since this 
method
+   // is called concurrently to the other ones and it may thus 
contain garbage
+   ByteArrayOutputStreamWithPos tmpKeySerializationStream =
--- End diff --

You don't need to wrap that heavily, most code uses 120 characters line 
lengths.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r96612430
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -242,6 +245,132 @@ public void testValueState() throws Exception {
backend.dispose();
}
 
+   /**
+* Tests {@link ValueState#value()} and {@link 
KvState#getSerializedValue(byte[])}
+* accessing the state concurrently. They should not get in the way of 
each
+* other.
+*/
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testValueStateRace() throws Exception {
+   final AbstractKeyedStateBackend backend =
+   createKeyedBackend(IntSerializer.INSTANCE);
+   final Integer namespace = Integer.valueOf(1);
+
+   final ValueStateDescriptor kvId =
+   new ValueStateDescriptor<>("id", String.class);
+   kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+   final TypeSerializer keySerializer = 
IntSerializer.INSTANCE;
+   final TypeSerializer namespaceSerializer =
+   IntSerializer.INSTANCE;
+   final TypeSerializer valueSerializer = 
kvId.getSerializer();
+
+   final ValueState state = backend
+   .getPartitionedState(namespace, IntSerializer.INSTANCE, 
kvId);
+
+   @SuppressWarnings("unchecked")
+   final KvState kvState = (KvState) state;
+
+   /**
+* 1) Test that ValueState#value() before and after
+* KvState#getSerializedValue(byte[]) return the same value.
+*/
+
+   // set some key and namespace
+   final int key1 = 1;
+   backend.setCurrentKey(key1);
+   kvState.setCurrentNamespace(2);
+   state.update("2");
+   assertEquals("2", state.value());
+
+   // query another key and namespace
+   assertNull(getSerializedValue(kvState, 3, keySerializer,
+   namespace, IntSerializer.INSTANCE,
+   valueSerializer));
+
+   // the state should not have changed!
+   assertEquals("2", state.value());
+
+   // re-set values
+   kvState.setCurrentNamespace(namespace);
+
+   /**
+* 2) Test two threads concurrently using ValueState#value() and
+* KvState#getSerializedValue(byte[]).
+*/
+
+   // some modifications to the state
+   final int key2 = 10;
+   backend.setCurrentKey(key2);
+   assertNull(state.value());
+   assertNull(getSerializedValue(kvState, key2, keySerializer,
+   namespace, namespaceSerializer, valueSerializer));
+   state.update("1");
+
+   boolean getterSuccess;
+   final Throwable[] throwables = {null, null};
+
+   final Thread getter = new Thread("State getter") {
--- End diff --

How about using the `CheckedThread` to avoid the stuff with Throwable 
arrays, etc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3143#discussion_r96476317
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
 
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-   writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-   return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
+
+   // we cannot reuse the keySerializationStream member since this 
method
+   // is called concurrently to the other ones and it may this 
contain garbage
--- End diff --

this -> thus?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-17 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3143

[FLINK-5530] fix race condition in AbstractRocksDBState#getSerializedValue

`AbstractRocksDBState#getSerializedValue()` uses the same key serialisation
stream as the ordinary state access methods but is called in parallel during
state queries thus violating the assumption of only one thread accessing it.

This may lead to either wrong results in queries or corrupt data while 
queries
are executed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-5530

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3143.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3143


commit 7fa4c61a04cb96b94907e6ec3803b994d3c6643d
Author: Nico Kruber 
Date:   2017-01-17T16:38:29Z

[FLINK-5530] fix race condition in AbstractRocksDBState#getSerializedValue

AbstractRocksDBState#getSerializedValue() uses the same key serialisation
stream as the ordinary state access methods but is called in parallel during
state queries thus violating the assumption of only one thread accessing it.

This may lead to either wrong results in queries or corrupt data while 
queries
are executed.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---