[jira] [Assigned] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState

2018-07-11 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9804:
-

Assignee: Sihua Zhou  (was: vinoyang)

> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -
>
> Key: FLINK-9804
> URL: https://issues.apache.org/jira/browse/FLINK-9804
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Aljoscha Krettek
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
>   final int namespace1ElementsNum = 1000;
>   final int namespace2ElementsNum = 1000;
>   String fieldName = "get-keys-test";
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   final String ns1 = "ns1";
>   MapState keyedState1 = 
> backend.getPartitionedState(
>   ns1,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = 0; key < namespace1ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState1.put("he", key * 2);
>   keyedState1.put("ho", key * 2);
>   }
>   final String ns2 = "ns2";
>   MapState keyedState2 = 
> backend.getPartitionedState(
>   ns2,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = namespace1ElementsNum; key < 
> namespace1ElementsNum + namespace2ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState2.put("he", key * 2);
>   keyedState2.put("ho", key * 2);
>   }
>   // valid for namespace1
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns1).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = 0; expectedKey < 
> namespace1ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   // valid for namespace2
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns2).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = namespace1ElementsNum; 
> expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState

2018-07-11 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9804:
---

Assignee: vinoyang

> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -
>
> Key: FLINK-9804
> URL: https://issues.apache.org/jira/browse/FLINK-9804
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
>   final int namespace1ElementsNum = 1000;
>   final int namespace2ElementsNum = 1000;
>   String fieldName = "get-keys-test";
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   final String ns1 = "ns1";
>   MapState keyedState1 = 
> backend.getPartitionedState(
>   ns1,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = 0; key < namespace1ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState1.put("he", key * 2);
>   keyedState1.put("ho", key * 2);
>   }
>   final String ns2 = "ns2";
>   MapState keyedState2 = 
> backend.getPartitionedState(
>   ns2,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = namespace1ElementsNum; key < 
> namespace1ElementsNum + namespace2ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState2.put("he", key * 2);
>   keyedState2.put("ho", key * 2);
>   }
>   // valid for namespace1
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns1).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = 0; expectedKey < 
> namespace1ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   // valid for namespace2
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns2).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = namespace1ElementsNum; 
> expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)