[
https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek updated FLINK-9804:
Description:
This can be reproduced by adding this test to {{StateBackendTestBase}}:
{code}
@Test
public void testMapStateGetKeys() throws Exception {
final int namespace1ElementsNum = 1000;
final int namespace2ElementsNum = 1000;
String fieldName = "get-keys-test";
AbstractKeyedStateBackend backend =
createKeyedBackend(IntSerializer.INSTANCE);
try {
final String ns1 = "ns1";
MapState keyedState1 =
backend.getPartitionedState(
ns1,
StringSerializer.INSTANCE,
new MapStateDescriptor<>(fieldName,
StringSerializer.INSTANCE, IntSerializer.INSTANCE)
);
for (int key = 0; key < namespace1ElementsNum; key++) {
backend.setCurrentKey(key);
keyedState1.put("he", key * 2);
keyedState1.put("ho", key * 2);
}
final String ns2 = "ns2";
MapState keyedState2 =
backend.getPartitionedState(
ns2,
StringSerializer.INSTANCE,
new MapStateDescriptor<>(fieldName,
StringSerializer.INSTANCE, IntSerializer.INSTANCE)
);
for (int key = namespace1ElementsNum; key <
namespace1ElementsNum + namespace2ElementsNum; key++) {
backend.setCurrentKey(key);
keyedState2.put("he", key * 2);
keyedState2.put("ho", key * 2);
}
// valid for namespace1
try (Stream keysStream = backend.getKeys(fieldName,
ns1).sorted()) {
PrimitiveIterator.OfInt actualIterator =
keysStream.mapToInt(value -> value.intValue()).iterator();
for (int expectedKey = 0; expectedKey <
namespace1ElementsNum; expectedKey++) {
assertTrue(actualIterator.hasNext());
assertEquals(expectedKey,
actualIterator.nextInt());
}
assertFalse(actualIterator.hasNext());
}
// valid for namespace2
try (Stream keysStream = backend.getKeys(fieldName,
ns2).sorted()) {
PrimitiveIterator.OfInt actualIterator =
keysStream.mapToInt(value -> value.intValue()).iterator();
for (int expectedKey = namespace1ElementsNum;
expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
assertTrue(actualIterator.hasNext());
assertEquals(expectedKey,
actualIterator.nextInt());
}
assertFalse(actualIterator.hasNext());
}
}
finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}
{code}
> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -
>
> Key: FLINK-9804
> URL: https://issues.apache.org/jira/browse/FLINK-9804
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
> final int namespace1ElementsNum = 1000;
> final int namespace2ElementsNum = 1000;
> String fieldName = "get-keys-test";
> AbstractKeyedStateBackend backend =
> createKeyedBackend(IntSerializer.INSTANCE);
> try {
> final String ns1 = "ns1";
> MapState keyedState1 =
> backend.getPartitionedState(
> ns1,
> StringSerializer.INSTANCE,
> new MapStateDescriptor<>(fieldName,
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
> );
> for (int key = 0; key < namespace1ElementsNum; key++) {
> backend.setCurrentKey(key);
> keyedState1.put("he", key * 2);
> keyedState1.put("ho", key * 2);
> }
> final String ns2 = "ns2";
> MapState keyedState2 =
> backend.getPartitionedState(
> ns2,
> StringSerializer.INSTANCE,
> new MapStateDescriptor<>(fieldName,
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)