[jira] [Assigned] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState
[ https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9804: - Assignee: Sihua Zhou (was: vinoyang) > KeyedStateBackend.getKeys() does not work on RocksDB MapState > - > > Key: FLINK-9804 > URL: https://issues.apache.org/jira/browse/FLINK-9804 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.5.1 >Reporter: Aljoscha Krettek >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > This can be reproduced by adding this test to {{StateBackendTestBase}}: > {code} > @Test > public void testMapStateGetKeys() throws Exception { > final int namespace1ElementsNum = 1000; > final int namespace2ElementsNum = 1000; > String fieldName = "get-keys-test"; > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > final String ns1 = "ns1"; > MapState keyedState1 = > backend.getPartitionedState( > ns1, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = 0; key < namespace1ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState1.put("he", key * 2); > keyedState1.put("ho", key * 2); > } > final String ns2 = "ns2"; > MapState keyedState2 = > backend.getPartitionedState( > ns2, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = namespace1ElementsNum; key < > namespace1ElementsNum + namespace2ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState2.put("he", key * 2); > keyedState2.put("ho", key * 2); > } > // valid for namespace1 > try (Stream keysStream = backend.getKeys(fieldName, > ns1).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = 0; expectedKey < > namespace1ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > // valid for namespace2 > try (Stream keysStream = backend.getKeys(fieldName, > ns2).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = namespace1ElementsNum; > expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState
[ https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9804: --- Assignee: vinoyang > KeyedStateBackend.getKeys() does not work on RocksDB MapState > - > > Key: FLINK-9804 > URL: https://issues.apache.org/jira/browse/FLINK-9804 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.5.1 >Reporter: Aljoscha Krettek >Assignee: vinoyang >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > This can be reproduced by adding this test to {{StateBackendTestBase}}: > {code} > @Test > public void testMapStateGetKeys() throws Exception { > final int namespace1ElementsNum = 1000; > final int namespace2ElementsNum = 1000; > String fieldName = "get-keys-test"; > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > final String ns1 = "ns1"; > MapState keyedState1 = > backend.getPartitionedState( > ns1, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = 0; key < namespace1ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState1.put("he", key * 2); > keyedState1.put("ho", key * 2); > } > final String ns2 = "ns2"; > MapState keyedState2 = > backend.getPartitionedState( > ns2, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = namespace1ElementsNum; key < > namespace1ElementsNum + namespace2ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState2.put("he", key * 2); > keyedState2.put("ho", key * 2); > } > // valid for namespace1 > try (Stream keysStream = backend.getKeys(fieldName, > ns1).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = 0; expectedKey < > namespace1ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > // valid for namespace2 > try (Stream keysStream = backend.getKeys(fieldName, > ns2).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = namespace1ElementsNum; > expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)