[jira] [Commented] (FLINK-9364) Add doc of the memory usage in flink
[ https://issues.apache.org/jira/browse/FLINK-9364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571015#comment-16571015 ] Sihua Zhou commented on FLINK-9364: --- Hi [~till.rohrmann] I was catch up by some terrible work, so I didn't work on this recently, will get back once I have time off. > Add doc of the memory usage in flink > > > Key: FLINK-9364 > URL: https://issues.apache.org/jira/browse/FLINK-9364 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.7.0 > > > We need to add a doc to describe the memory usage in flink, especially when > people use the RocksDBBackend, many people get confuse because of that (I've > saw serval question related to this on the user emails). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9364) Add doc of the memory usage in flink
[ https://issues.apache.org/jira/browse/FLINK-9364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9364: -- Fix Version/s: (was: 1.6.0) 1.7.0 > Add doc of the memory usage in flink > > > Key: FLINK-9364 > URL: https://issues.apache.org/jira/browse/FLINK-9364 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.7.0 > > > We need to add a doc to describe the memory usage in flink, especially when > people use the RocksDBBackend, many people get confuse because of that (I've > saw serval question related to this on the user emails). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState
[ https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou closed FLINK-9804. - Resolution: Fixed Merged in: 1.6.0: def2aed5c75b5a00815186d3343e66cb1dc01ac0 1.5.2: 8a564f82aa521670a0b6813c5deb65586b1fa136 > KeyedStateBackend.getKeys() does not work on RocksDB MapState > - > > Key: FLINK-9804 > URL: https://issues.apache.org/jira/browse/FLINK-9804 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.5.1 >Reporter: Aljoscha Krettek >Assignee: Sihua Zhou >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > This can be reproduced by adding this test to {{StateBackendTestBase}}: > {code} > @Test > public void testMapStateGetKeys() throws Exception { > final int namespace1ElementsNum = 1000; > final int namespace2ElementsNum = 1000; > String fieldName = "get-keys-test"; > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > final String ns1 = "ns1"; > MapState keyedState1 = > backend.getPartitionedState( > ns1, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = 0; key < namespace1ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState1.put("he", key * 2); > keyedState1.put("ho", key * 2); > } > final String ns2 = "ns2"; > MapState keyedState2 = > backend.getPartitionedState( > ns2, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = namespace1ElementsNum; key < > namespace1ElementsNum + namespace2ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState2.put("he", key * 2); > keyedState2.put("ho", key * 2); > } > // valid for namespace1 > try (Stream keysStream = backend.getKeys(fieldName, > ns1).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = 0; expectedKey < > namespace1ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > // valid for namespace2 > try (Stream keysStream = backend.getKeys(fieldName, > ns2).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = namespace1ElementsNum; > expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState
[ https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540077#comment-16540077 ] Sihua Zhou commented on FLINK-9804: --- [~aljoscha] Yes, I'm writing the PR description... > KeyedStateBackend.getKeys() does not work on RocksDB MapState > - > > Key: FLINK-9804 > URL: https://issues.apache.org/jira/browse/FLINK-9804 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.5.1 >Reporter: Aljoscha Krettek >Assignee: vinoyang >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > This can be reproduced by adding this test to {{StateBackendTestBase}}: > {code} > @Test > public void testMapStateGetKeys() throws Exception { > final int namespace1ElementsNum = 1000; > final int namespace2ElementsNum = 1000; > String fieldName = "get-keys-test"; > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > final String ns1 = "ns1"; > MapState keyedState1 = > backend.getPartitionedState( > ns1, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = 0; key < namespace1ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState1.put("he", key * 2); > keyedState1.put("ho", key * 2); > } > final String ns2 = "ns2"; > MapState keyedState2 = > backend.getPartitionedState( > ns2, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = namespace1ElementsNum; key < > namespace1ElementsNum + namespace2ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState2.put("he", key * 2); > keyedState2.put("ho", key * 2); > } > // valid for namespace1 > try (Stream keysStream = backend.getKeys(fieldName, > ns1).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = 0; expectedKey < > namespace1ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > // valid for namespace2 > try (Stream keysStream = backend.getKeys(fieldName, > ns2).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = namespace1ElementsNum; > expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState
[ https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9804: - Assignee: Sihua Zhou (was: vinoyang) > KeyedStateBackend.getKeys() does not work on RocksDB MapState > - > > Key: FLINK-9804 > URL: https://issues.apache.org/jira/browse/FLINK-9804 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.5.1 >Reporter: Aljoscha Krettek >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > This can be reproduced by adding this test to {{StateBackendTestBase}}: > {code} > @Test > public void testMapStateGetKeys() throws Exception { > final int namespace1ElementsNum = 1000; > final int namespace2ElementsNum = 1000; > String fieldName = "get-keys-test"; > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > final String ns1 = "ns1"; > MapState keyedState1 = > backend.getPartitionedState( > ns1, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = 0; key < namespace1ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState1.put("he", key * 2); > keyedState1.put("ho", key * 2); > } > final String ns2 = "ns2"; > MapState keyedState2 = > backend.getPartitionedState( > ns2, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = namespace1ElementsNum; key < > namespace1ElementsNum + namespace2ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState2.put("he", key * 2); > keyedState2.put("ho", key * 2); > } > // valid for namespace1 > try (Stream keysStream = backend.getKeys(fieldName, > ns1).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = 0; expectedKey < > namespace1ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > // valid for namespace2 > try (Stream keysStream = backend.getKeys(fieldName, > ns2).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = namespace1ElementsNum; > expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState
[ https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540034#comment-16540034 ] Sihua Zhou commented on FLINK-9804: --- Hi [~yanghua] do you start working on this already? I just finished this but forgot to assign it to myself. Anyway, this is my code ([https://github.com/sihuazhou/flink/commit/efc4096a4a6f38b3acb0b5189804f7b452218f23]), hope this could somehow reduce your work or give you some help. ;) > KeyedStateBackend.getKeys() does not work on RocksDB MapState > - > > Key: FLINK-9804 > URL: https://issues.apache.org/jira/browse/FLINK-9804 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.5.1 >Reporter: Aljoscha Krettek >Assignee: vinoyang >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > This can be reproduced by adding this test to {{StateBackendTestBase}}: > {code} > @Test > public void testMapStateGetKeys() throws Exception { > final int namespace1ElementsNum = 1000; > final int namespace2ElementsNum = 1000; > String fieldName = "get-keys-test"; > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > final String ns1 = "ns1"; > MapState keyedState1 = > backend.getPartitionedState( > ns1, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = 0; key < namespace1ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState1.put("he", key * 2); > keyedState1.put("ho", key * 2); > } > final String ns2 = "ns2"; > MapState keyedState2 = > backend.getPartitionedState( > ns2, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = namespace1ElementsNum; key < > namespace1ElementsNum + namespace2ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState2.put("he", key * 2); > keyedState2.put("ho", key * 2); > } > // valid for namespace1 > try (Stream keysStream = backend.getKeys(fieldName, > ns1).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = 0; expectedKey < > namespace1ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > // valid for namespace2 > try (Stream keysStream = backend.getKeys(fieldName, > ns2).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = namespace1ElementsNum; > expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9736) Potential null reference in KeyGroupPartitionedPriorityQueue#poll()
[ https://issues.apache.org/jira/browse/FLINK-9736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533506#comment-16533506 ] Sihua Zhou commented on FLINK-9736: --- Hi [~yuzhih...@gmail.com] AFAIK, {{heapOfKeyGroupHeaps}} will never be empty, as I mentioned above, it created in the constructor of {{KeyGroupPartitionedPriorityQueue}} to maintain the timer structure base on the heap of each key group. The number of elements in heapOfKeyGroupHeaps should be equal to the number of key groups in each Task, but as a double check I think maybe [~stefanrichte...@gmail.com] could help to confirm this. > Potential null reference in KeyGroupPartitionedPriorityQueue#poll() > --- > > Key: FLINK-9736 > URL: https://issues.apache.org/jira/browse/FLINK-9736 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > final PQ headList = heapOfkeyGroupedHeaps.peek(); > final T head = headList.poll(); > {code} > {{peek}} call may return null. > The return value should be checked before de-referencing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9735) Potential resource leak in RocksDBStateBackend#getDbOptions
[ https://issues.apache.org/jira/browse/FLINK-9735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532982#comment-16532982 ] Sihua Zhou commented on FLINK-9735: --- I think this might be a trade-off of the design, - 1) let the use create the option base on some basic configuration to get a good performance in flink (e.g. setFsync(false) because Flink does not rely on RocksDB data on disk for recovery). - 2) give the chance for the use to create the option totally by themselves. Given the design the user need to follow the rule it outlined, but I agree that OptionFactory's api is a bit easy to overlook, and lead the currentOptions parameter to be abandoned. Maybe a better interface could be provided to address this. > Potential resource leak in RocksDBStateBackend#getDbOptions > --- > > Key: FLINK-9735 > URL: https://issues.apache.org/jira/browse/FLINK-9735 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > Here is related code: > {code} > if (optionsFactory != null) { > opt = optionsFactory.createDBOptions(opt); > } > {code} > opt, an DBOptions instance, should be closed before being rewritten. > getColumnOptions has similar issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9735) Potential resource leak in RocksDBStateBackend#getDbOptions
[ https://issues.apache.org/jira/browse/FLINK-9735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532956#comment-16532956 ] Sihua Zhou commented on FLINK-9735: --- Oh [~yuzhih...@gmail.com] you are right! The case in {{RocksDBResource}} should be fixed, it didn't follow the OptionsFactory's java doc. So the true issue here is not related to the code {code:java} if (optionsFactory != null) { opt = optionsFactory.createDBOptions(opt); } {code} in {{RocksDBStateBackend}}, but for the use case in {{RocksDBResource}}. Do you mind if I change this ticket's description and title to the true issue. > Potential resource leak in RocksDBStateBackend#getDbOptions > --- > > Key: FLINK-9735 > URL: https://issues.apache.org/jira/browse/FLINK-9735 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > Here is related code: > {code} > if (optionsFactory != null) { > opt = optionsFactory.createDBOptions(opt); > } > {code} > opt, an DBOptions instance, should be closed before being rewritten. > getColumnOptions has similar issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9735) Potential resource leak in RocksDBStateBackend#getDbOptions
[ https://issues.apache.org/jira/browse/FLINK-9735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532728#comment-16532728 ] Sihua Zhou commented on FLINK-9735: --- Hi [~yuzhih...@gmail.com], I think this is a non issue, because the opt will be reused. If we have a look at the java doc of the {{OptionsFactory#createDBOptions()}} we can find that this is intended. {code:java} public interface OptionsFactory extends java.io.Serializable { /** * This method should set the additional options on top of the current options object. * The current options object may contain pre-defined options based on flags that have * been configured on the state backend. * * It is important to set the options on the current object and return the result from * the setter methods, otherwise the pre-defined options may get lost. * * @param currentOptions The options object with the pre-defined options. * @return The options object on which the additional options are set. */ DBOptions createDBOptions(DBOptions currentOptions); /** * This method should set the additional options on top of the current options object. * The current options object may contain pre-defined options based on flags that have * been configured on the state backend. * * It is important to set the options on the current object and return the result from * the setter methods, otherwise the pre-defined options may get lost. * * @param currentOptions The options object with the pre-defined options. * @return The options object on which the additional options are set. */ ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions); } {code} What do you think? > Potential resource leak in RocksDBStateBackend#getDbOptions > --- > > Key: FLINK-9735 > URL: https://issues.apache.org/jira/browse/FLINK-9735 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > Here is related code: > {code} > if (optionsFactory != null) { > opt = optionsFactory.createDBOptions(opt); > } > {code} > opt, an DBOptions instance, should be closed before being rewritten. > getColumnOptions has similar issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9736) Potential null reference in KeyGroupPartitionedPriorityQueue#poll()
[ https://issues.apache.org/jira/browse/FLINK-9736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532726#comment-16532726 ] Sihua Zhou commented on FLINK-9736: --- Hi [~yuzhih...@gmail.com] I think this might look like a non issue, because `heapOfKeyGroupHeaps` is a heap-of-heap that created on the constructer of {{KeyGroupPartitionedPriorityQueue}} to maintain the timer structure base on the heap of each key group, and we never call `poll()` on it. > Potential null reference in KeyGroupPartitionedPriorityQueue#poll() > --- > > Key: FLINK-9736 > URL: https://issues.apache.org/jira/browse/FLINK-9736 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > final PQ headList = heapOfkeyGroupedHeaps.peek(); > final T head = headList.poll(); > {code} > {{peek}} call may return null. > The return value should be checked before de-referencing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9351) RM stop assigning slot to Job because the TM killed before connecting to JM successfully
[ https://issues.apache.org/jira/browse/FLINK-9351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou closed FLINK-9351. - Resolution: Duplicate This issue have been fixed by the way in the PR of [FLINK-9456|https://issues.apache.org/jira/browse/FLINK-9456]. > RM stop assigning slot to Job because the TM killed before connecting to JM > successfully > > > Key: FLINK-9351 > URL: https://issues.apache.org/jira/browse/FLINK-9351 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Fix For: 1.6.0 > > > The steps are the following(copied from Stephan's comments in > [5931|https://github.com/apache/flink/pull/5931]): > - JobMaster / SlotPool requests a slot (AllocationID) from the ResourceManager > - ResourceManager starts a container with a TaskManager > - TaskManager registers at ResourceManager, which tells the TaskManager to > push a slot to the JobManager. > - TaskManager container is killed > - The ResourceManager does not queue back the slot requests (AllocationIDs) > that it sent to the previous TaskManager, so the requests are lost and need > to time out before another attempt is tried. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9474) Add runtime support of distinct filter using ElasticBloomFilter
[ https://issues.apache.org/jira/browse/FLINK-9474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9474: -- Description: We can implement an approximate version of "distinct filter" base on the "Elastic Bloom Filter", It could be very fast because we don't need to query the state anymore, its accuracy should could be configurable. e.g 95%, 98% and it might unable to support retraction. (was: We can implement an approximate version of "count distinct" base on the "Elastic Bloom Filter", It could be very fast because we don't need to query the state anymore, its accuracy should could be configurable. e.g 95%, 98%.) > Add runtime support of distinct filter using ElasticBloomFilter > --- > > Key: FLINK-9474 > URL: https://issues.apache.org/jira/browse/FLINK-9474 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We can implement an approximate version of "distinct filter" base on the > "Elastic Bloom Filter", It could be very fast because we don't need to query > the state anymore, its accuracy should could be configurable. e.g 95%, 98% > and it might unable to support retraction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9474) Add runtime support of distinct filter using ElasticBloomFilter
[ https://issues.apache.org/jira/browse/FLINK-9474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9474: -- Summary: Add runtime support of distinct filter using ElasticBloomFilter (was: Introduce an approximate version of "count distinct") > Add runtime support of distinct filter using ElasticBloomFilter > --- > > Key: FLINK-9474 > URL: https://issues.apache.org/jira/browse/FLINK-9474 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We can implement an approximate version of "count distinct" base on the > "Elastic Bloom Filter", It could be very fast because we don't need to query > the state anymore, its accuracy should could be configurable. e.g 95%, 98%. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9619: -- Issue Type: Improvement (was: Bug) > Always close the task manager connection when the container is completed in > YarnResourceManager > --- > > Key: FLINK-9619 > URL: https://issues.apache.org/jira/browse/FLINK-9619 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.6.0, 1.5.1 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > We should always eagerly close the connection with task manager when the > container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9619: -- Priority: Major (was: Critical) > Always close the task manager connection when the container is completed in > YarnResourceManager > --- > > Key: FLINK-9619 > URL: https://issues.apache.org/jira/browse/FLINK-9619 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.6.0, 1.5.1 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > We should always eagerly close the connection with task manager when the > container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9661) TTL state should support to do time shift after restoring from checkpoint( savepoint).
Sihua Zhou created FLINK-9661: - Summary: TTL state should support to do time shift after restoring from checkpoint( savepoint). Key: FLINK-9661 URL: https://issues.apache.org/jira/browse/FLINK-9661 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.6.0 Reporter: Sihua Zhou The initial version of the TTL-state appends the expired timestamp along the state record, and check the expired timestamp with the condition {{expired_timestamp <= current_time}} when accessing the state, if it is true then the record is expired, otherwise it is still alive. This could works pretty fine in the most cases, but in some case, we need to do time shift, otherwise it may cause some unexpected result when using the ProccessTime, I roughly describe two case as follow. - when restoring the job from the savepoint For example, the user set the TTL to 2h for the state, if he trigger a savepoint and restore the job from the savepoint after 2h(maybe some reason that delay he to restore the job quickly), then the restored job's previous state data are all expired. - when the job spend a long time to recover from a failure For example, there are many jobs running on a yarn session cluster, and the cluster configured to use the DFS to store the checkpoint data, but unfortunately, the DFS meet a strange problem which makes the jobs on the cluster begin to loop in recovery-fail-recovery-fail... the devs spend some time to address the issue of DFS and the jobs start working properly, but if the "{{system down time >= TTL}}" then the job's previous state data will be expired in this case. To avoid the problems as above, we need to do time shift after the job recovering from checkpoint & savepoint. A possible approach is outlined in [6186|https://github.com/apache/flink/pull/6186]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final
[ https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522981#comment-16522981 ] Sihua Zhou commented on FLINK-9585: --- [~Zentol] thanks for pointing out this issue for me, I got it. > Logger in ZooKeeperStateHandleStore is public and non-final > --- > > Key: FLINK-9585 > URL: https://issues.apache.org/jira/browse/FLINK-9585 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Trivial > Labels: pull-request-available > Fix For: 1.6.0 > > > The logger in {{ZooKeeperStateHandleStore}} should be private and final. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final
[ https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou closed FLINK-9585. - Resolution: Fixed Merged in: master: 5fa61d8ceac8f865002dc0ef84dc1a3c65753d0b > Logger in ZooKeeperStateHandleStore is public and non-final > --- > > Key: FLINK-9585 > URL: https://issues.apache.org/jira/browse/FLINK-9585 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Trivial > Labels: pull-request-available > > The logger in {{ZooKeeperStateHandleStore}} should be private and final. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.
Sihua Zhou created FLINK-9633: - Summary: Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task. Key: FLINK-9633 URL: https://issues.apache.org/jira/browse/FLINK-9633 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.5.0 Reporter: Sihua Zhou Assignee: Sihua Zhou Fix For: 1.6.0, 1.5.1 Currently, flink use the Savepoint's filesystem to create the meta output stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses the Checkpoint's filesystem to create the checkpoint data output stream. When the Savepoint & Checkpoint in different filesystem this will lead to problematic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9622) DistributedCacheDfsTest failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517878#comment-16517878 ] Sihua Zhou commented on FLINK-9622: --- one more instances: https://api.travis-ci.org/v3/job/394119180/log.txt > DistributedCacheDfsTest failed on travis > > > Key: FLINK-9622 > URL: https://issues.apache.org/jira/browse/FLINK-9622 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Sihua Zhou >Priority: Major > > DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis. > instance: https://api.travis-ci.org/v3/job/394399700/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9622) DistributedCacheDfsTest failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517874#comment-16517874 ] Sihua Zhou commented on FLINK-9622: --- cc [~dawidwys] > DistributedCacheDfsTest failed on travis > > > Key: FLINK-9622 > URL: https://issues.apache.org/jira/browse/FLINK-9622 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Sihua Zhou >Priority: Major > > DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis. > instance: https://api.travis-ci.org/v3/job/394399700/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9622) DistributedCacheDfsTest failed on travis
Sihua Zhou created FLINK-9622: - Summary: DistributedCacheDfsTest failed on travis Key: FLINK-9622 URL: https://issues.apache.org/jira/browse/FLINK-9622 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.6.0 Reporter: Sihua Zhou DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis. instance: https://api.travis-ci.org/v3/job/394399700/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
Sihua Zhou created FLINK-9619: - Summary: Always close the task manager connection when the container is completed in YarnResourceManager Key: FLINK-9619 URL: https://issues.apache.org/jira/browse/FLINK-9619 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.6.0, 1.5.1 Reporter: Sihua Zhou Assignee: Sihua Zhou Fix For: 1.6.0, 1.5.1 We should always eagerly close the connection with task manager when the container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9417) Send heartbeat requests from RPC endpoint's main thread
[ https://issues.apache.org/jira/browse/FLINK-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517790#comment-16517790 ] Sihua Zhou commented on FLINK-9417: --- Hi [~till.rohrmann] One thing come to my mind, If we send heartbeat requests from RPC's main thread, then should we also do a checking for the HEARTBEAT_INTERVAL with a sanity min value(currently it only need to greater than 0)? If the user configure a very small value e.g 10, then the resource manager and the job master will be kept always very busy just for sending the heartbeat. > Send heartbeat requests from RPC endpoint's main thread > --- > > Key: FLINK-9417 > URL: https://issues.apache.org/jira/browse/FLINK-9417 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > > Currently, we use the {{RpcService#scheduledExecutor}} to send heartbeat > requests to remote targets. This has the problem that we still see heartbeats > from this endpoint also if its main thread is currently blocked. Due to this, > the heartbeat response cannot be processed and the remote target times out. > On the remote side, this won't be noticed because it still receives the > heartbeat requests. > A solution to this problem would be to send the heartbeat requests to the > remote thread through the RPC endpoint's main thread. That way, also the > heartbeats would be blocked if the main thread is blocked/busy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9613) YARNSessionCapacitySchedulerITCase failed because YarnTestBase.checkClusterEmpty()
Sihua Zhou created FLINK-9613: - Summary: YARNSessionCapacitySchedulerITCase failed because YarnTestBase.checkClusterEmpty() Key: FLINK-9613 URL: https://issues.apache.org/jira/browse/FLINK-9613 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.6.0 Reporter: Sihua Zhou The test YARNSessionCapacitySchedulerITCase failed on travis because of .YarnTestBase.checkClusterEmpty(). https://api.travis-ci.org/v3/job/394017104/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9601) Snapshot of CopyOnWriteStateTable will failed when the amount of record is more than MAXIMUM_CAPACITY
[ https://issues.apache.org/jira/browse/FLINK-9601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9601: -- Summary: Snapshot of CopyOnWriteStateTable will failed when the amount of record is more than MAXIMUM_CAPACITY (was: Snapshot of CopyOnWriteStateTable will failed, when the amount of record is more than MAXIMUM_CAPACITY) > Snapshot of CopyOnWriteStateTable will failed when the amount of record is > more than MAXIMUM_CAPACITY > - > > Key: FLINK-9601 > URL: https://issues.apache.org/jira/browse/FLINK-9601 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > In short, the problem is that we reuse the `snaphotData` as the output array > when partitioning the input data, but the `snapshotData` is max length is `1 > << 30`. So when the records in `CopyOnWriteStateTable` is more than `1 << 30` > (e.g. 1 <<30 + 1), then the check > `Preconditions.checkState(partitioningDestination.length >= > numberOfElements);` could be failed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9601) Snapshot of CopyOnWriteStateTable will failed, when the amount of record is more than MAXIMUM_CAPACITY
Sihua Zhou created FLINK-9601: - Summary: Snapshot of CopyOnWriteStateTable will failed, when the amount of record is more than MAXIMUM_CAPACITY Key: FLINK-9601 URL: https://issues.apache.org/jira/browse/FLINK-9601 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.6.0 Reporter: Sihua Zhou Assignee: Sihua Zhou Fix For: 1.6.0 In short, the problem is that we reuse the `snaphotData` as the output array when partitioning the input data, but the `snapshotData` is max length is `1 << 30`. So when the records in `CopyOnWriteStateTable` is more than `1 << 30` (e.g. 1 <<30 + 1), then the check `Preconditions.checkState(partitioningDestination.length >= numberOfElements);` could be failed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9455: - Assignee: Till Rohrmann (was: Sihua Zhou) > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513336#comment-16513336 ] Sihua Zhou commented on FLINK-9455: --- Hi [~till.rohrmann] the more I thought about this , the more I found it's tricky, since you maybe the best one that familiar with this related part. I don't want mess this up, I would leave this issue to you. > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9584) Unclosed streams in Bucketing-/RollingSink
[ https://issues.apache.org/jira/browse/FLINK-9584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9584: - Assignee: Sihua Zhou > Unclosed streams in Bucketing-/RollingSink > -- > > Key: FLINK-9584 > URL: https://issues.apache.org/jira/browse/FLINK-9584 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Sihua Zhou >Priority: Major > > There are 4 instances of {{FSDataOutputStream}} that are not properly closed > in the {{BucketingSink}} (2) and {{RollingSink}} (2). > > [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java#L536] > > [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java#L705] > > [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L638] > [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#882] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510828#comment-16510828 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow], I didn't see the email you sent yet, but I just had a look at your code, I think the "non-scale-able" might be caused by your test code. From your code we could see that the source's parallelism is always the same as the other operators. And in the each sub-task of the source, you use the loop to mock the source records, that means the QPS of the source will increase when you trying to rescale up the parallelism of your job, in the end, you didn't scale up anything indeed. I would suggest to set the parallelism of the source to a fixed value(e.g. 4), and scale up the job, then let's see whether it's scalable. I didn't test your code on cluster yet, will test it later. My email is "summerle...@163.com", if you had problem to send email to "u...@flink.apache.org", you could send to me personally if you want. Thanks~ > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510757#comment-16510757 ] Sihua Zhou commented on FLINK-9506: --- Hi, I think you could send your question to the "u...@flink.apache.org". > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510750#comment-16510750 ] Sihua Zhou commented on FLINK-9506: --- I'm + 1 to close this ticket and move to ML, we can definitely continue the discussion there, I will try out your code, and give feedbacks tonight, please bear me several hours. Cause I currently busy with something others. Thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510716#comment-16510716 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] "If we further comment out recordStore.add() then everything works well, no more fluctuation" this surprised me, because for RocksDB backend the `listState.add()` is just merge(just put without any reading) the record into db, it's cheap in my mind. I had downloaded your code. Thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510649#comment-16510649 ] Sihua Zhou commented on FLINK-9506: --- Additional, do you make sure that you are using the RocksDb backend? Could you find the "Initializing RocksDB keyed state backend" in the TaskManager's log? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510647#comment-16510647 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] in the input_stop_when_timer_run.png, does the yellow line mean QPS of input, and the green line mean QPS of output? If this picture is captured when the onTimer is uncomment out, then it didn't surprise me, but if the picture is captured when the content of onTimer is commented out, then it surprised me a bit. And you mentioned that, when the content of onTimer is commented out, the Fluctuation still exists. Does the commented out means that there is nothing in the onTimer()? If yes, I think it surprised me and for an additional could you also comment out the `recordStore.add()` in processElement(). If both the content of onTimer() and the `recordStore.add()` are commented out and the Fluctuation still there, I think the problem is related to the timer, because of the GC. And I'm curious about the QPS of source for you job? and the degree of the parallelism of your job? Thanks~ > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510580#comment-16510580 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] I don't think this a limitation in Flink, we have more complex with terrible data flow on production but flink supports it very well. Let look into your case deeper. - Did you enable the checkpoint now? if yes, are you using incremental checkpoint? and what the checkpoint interval? - could you try to comment the code that related to the accumulation in the `onTimer` and have a try? Specially, comment the line "listState.get()" - Is it possible that you could somehow provide some code that related to the `ProcessAggregation` that you are using currentlly? Thanks > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509818#comment-16509818 ] Sihua Zhou edited comment on FLINK-9506 at 6/12/18 4:34 PM: Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? In fact, it's a bit hard for me to believe the fluctuation is caused by the keyBy. AFAIK, it just controls which channel the record to go(when transfer between operators) and the content of the key stored in the RocksDB, without using any state the keyBy() should be cheap. I think the picture related to he keyNoHash vs KeyHash is what I expected. With hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? was (Author: sihuazhou): Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? I think the picture related to he keyNoHash vs KeyHash is what I expected. With hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509818#comment-16509818 ] Sihua Zhou edited comment on FLINK-9506 at 6/12/18 4:26 PM: Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? I think the picture related to he keyNoHash vs KeyHash is what I expected. With hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? was (Author: sihuazhou): Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? I think the picture related to he keyNoHash vs KeyHash is what I expected. Without hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509818#comment-16509818 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? I think the picture related to he keyNoHash vs KeyHash is what I expected. Without hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507815#comment-16507815 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] I think we can close this ticket now, do you agree? Thanks~ > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9561) Lack of the api to set RocksDB Option by flink config
[ https://issues.apache.org/jira/browse/FLINK-9561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507249#comment-16507249 ] Sihua Zhou commented on FLINK-9561: --- Hi [~aitozi] I think flink already supported this, you can implement the {{OptionsFactory}} to override the {{OptionsFactory#createDBOptions}} to special your {{DBOptions}}, and override the {{OptionsFactory#createColumnOptions}} to special your {{ColumnFamilyOptions}}. A simple example likes like below {code:java} rocksDbBackend.setOptions(new OptionsFactory() { public DBOptions createDBOptions(DBOptions currentOptions) { return currentOptions.setMaxOpenFiles(1024); } public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { return currentOptions.setCompactionStyle(org.rocksdb.CompactionStyle.LEVEL); } }); {code} > Lack of the api to set RocksDB Option by flink config > - > > Key: FLINK-9561 > URL: https://issues.apache.org/jira/browse/FLINK-9561 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > > we have serval PredefinedOptions for rocksdb options, but it is not > configurable, I think should support this to allow user to choose according > to the device. [~StephanEwen] do you think so? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504729#comment-16504729 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] From the top of my head, I list answers here: - >> 1. Just to confirm, RocksDB is needed to setup in every TM machine? Any other option? RocksDB is needed to setup in every sub-tasks that use the KeyedState if you are using RocksDB backend. - >> 2. What is the recommendation for RocksDB's statebackend? We are using tmpfs with checkpoint now with savepoint persists to hdfs. Q1. I think the default configuration of the RocksDB backend is quite good for the most of the jobs. Q2. I'm not sure whether I got you correctly, the savepoint is triggered manually, and checkpoint is triggered automatically, you means that you trigger the savepoint manually periodically? - >> 3. By source code, rocksdb options like parallelism and certain predefined option could be configured, any corresponding parameter in flink_config.yaml? AFAIK, RocksDB's options need to set in source code if you need to special it. The default parallelism of the operator can be configured in flink-conf.yaml - >> 4. related to your RocksDB config. I see you are using "file:///tmp/rocksdb_simple_example/checkpoints" as the checkpoint directory, I'm not sure if it's accessible to all TMs. If yes, I think that is ok, and also I didn't see your checkpoint interval... BTW, you said you are using the {{r.getUNIQUE_KEY();}} as the key, I'm a bit curious about it's length in general. If it's too long and if you don't need an exactly result, you could use the {{r.getUNIQUE_KEY().hashCode();}} instead, that may also help to improve the performance. And in fact, I also agree with [~kkrugler] that this type of question is best asked in the user mail list, that way more people could take part in and you might also get more ideals from them. ;) > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503505#comment-16503505 ] Sihua Zhou edited comment on FLINK-9506 at 6/7/18 9:25 AM: --- [~yow] Maybe there is one more optimization that could have a try, I see you are using the ReduceState in your code just to accumulate the `record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For the ReduceState it works as follows: - get the "old result" from RocksDB. - reduce the "old result" with the input, and put the "new result" back to RocksDB. that means for input record in processElement(), it needs to do a `get` and a `put` to RocksDB. And the `get` cost much more then `put`. I would suggest to use the ListState instead. With using ListState, what you need to do are: - Performing {{ListState.add(record)}} in {{processElement()}}, since the `ListState.add()` is cheap as it only put the record into Rocks. - Performing reducing in {{OnTimer()}}, the reducing might look as follow: {code:java} List< JSONObject> records = listState.get(); for (JSonObject jsonObj : records) { // do accumulation } out.collect(result); {code} In this way, for every key every second, you only need to do one read operation of RocksDB. was (Author: sihuazhou): [~yow] Maybe there is one more optimization that could have a try, I see you are using the ReduceState in your code just to accumulate the `record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For the ReduceState it works as follows: - get the "old result" from RocksDB. - reduce the "old result" with the input, and put the "new result" back to RocksDB. that means for input record in processElement(), it needs to do a `get` and a `put` to RocksDB. And the `get` cost much more then `put`. I would suggest to use the ListState instead. With using ListState, what you need to do are: - Performing {{ListState.add(record)}} in {{processElement()}}, since the `ListState.add()` is cheap as it not put the record into Rocks. - Performing reducing in {{OnTimer()}}, the reducing might look as follow: {code:java} List< JSONObject> records = listState.get(); for (JSonObject jsonObj : records) { // do accumulation } out.collect(result); {code} In this way, for every key very seconds, you only need to do one read operation of RocksDB. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9546) The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0
Sihua Zhou created FLINK-9546: - Summary: The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0 Key: FLINK-9546 URL: https://issues.apache.org/jira/browse/FLINK-9546 Project: Flink Issue Type: Bug Components: Core Reporter: Sihua Zhou Assignee: Sihua Zhou The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, currently the arg check looks like {code:java} Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout interval has to be larger than 0."); {code} it should be {code:java} Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout interval has to be larger than 0."); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503505#comment-16503505 ] Sihua Zhou commented on FLINK-9506: --- [~yow] Maybe there is one more optimization that could have a try, I see you are using the ReduceState in your code just to accumulate the `record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For the ReduceState it works as follows: - get the "old result" from RocksDB. - reduce the "old result" with the input, and put the "new result" back to RocksDB. that means for input record in processElement(), it needs to do a `get` and a `put` to RocksDB. And the `get` cost much more then `put`. I would suggest to use the ListState instead. With using ListState, what you need to do are: - Performing {{ListState.add(record)}} in {{processElement()}}, since the `ListState.add()` is cheap as it not put the record into Rocks. - Performing reducing in {{OnTimer()}}, the reducing might look as follow: {code:java} List< JSONObject> records = listState.get(); for (JSonObject jsonObj : records) { // do accumulation } out.collect(result); {code} In this way, for every key very seconds, you only need to do one read operation of RocksDB. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502935#comment-16502935 ] Sihua Zhou commented on FLINK-9455: --- [~till.rohrmann] Thanks for your reply, I agreed. I'll give a initial design document for this ticket probably on this weekend, but plz feel free to take over this anytime if you want to. > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502809#comment-16502809 ] Sihua Zhou commented on FLINK-8845: --- Hi [~noliran] Yes, this is on purpose. In fact, we tried([FLINK-8859|https://issues.apache.org/jira/browse/FLINK-8859]) to disable WAL in RocksDBKeyedStateBackend when restoring the backend, but it will cause segfaults on travis([FLINK-8882|https://issues.apache.org/jira/browse/FLINK-8882]), and the reason why it caused the segfaults is still not clear, so we reverted([8922|https://issues.apache.org/jira/browse/FLINK-8922]) it in the end. > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8753) Introduce savepoint that go though the incremental checkpoint path
[ https://issues.apache.org/jira/browse/FLINK-8753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou closed FLINK-8753. - Resolution: Invalid > Introduce savepoint that go though the incremental checkpoint path > -- > > Key: FLINK-8753 > URL: https://issues.apache.org/jira/browse/FLINK-8753 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > Right now, savepoint goes through the full checkpoint path, take a savepoint > could be slowly. In our production, for some long term job it often costs > more than 10min to complete a savepoint which is unacceptable for a real time > job, so we have to turn back to use the externalized checkpoint instead > currently. But the externalized checkpoint has a time interval (checkpoint > interval) between the last time. So I proposal to introduce the increment > savepoint which goes through the increment checkpoint path. > Any advice would be appreciated! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502064#comment-16502064 ] Sihua Zhou commented on FLINK-9455: --- Hi [~till.rohrmann], I think this is a bit tricky than it looks like, I'd like to have a brief discussion with you before jumping into the implementation - Should we consider the situation that every request of the TM could with different ResourceProfile? e.g. different cores and different TM memory. Currently the TM's configuration is "immutable" for every cluster, but I can see the trend that community may support request a user specific TM according to their config. - Should we hold the assumption that the ResourceProfile we requested from the ResourceManager is definitely the same as the actual ResourceProfile we got from the TM? Or do you have any ideal on this ticket? Thanks~ Sihua > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8601) Introduce ElasticBloomFilter for Approximate calculation and other situations of performance optimization
[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502037#comment-16502037 ] Sihua Zhou commented on FLINK-8601: --- Hi [~aljoscha] [~StephanEwen] would be really appreciate if anyone of you could take a look at this, I'm quite confidence that this would be a useful feature in many performance optimization scenarios... > Introduce ElasticBloomFilter for Approximate calculation and other situations > of performance optimization > - > > Key: FLINK-8601 > URL: https://issues.apache.org/jira/browse/FLINK-8601 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > h2. *Motivation* > There are some scenarios drive us to introduce this ElasticBloomFilter, one > is Stream Join, another is Data Deduplication, and some special user > cases...This has given us a great experience, for example, we implemented > the Runtime Filter Join base on it, and it gives us a great performance > improvement. With this feature, It diff us from the "normal stream join", > allows us to improve performance while reducing resource consumption by about > half!!! > I will list the two most typical user cases that optimized by the > ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data > Dedeplication" in brief. > *Scenario 1: Runtime Filter Join* > In general, stream join is one of the most performance cost task. For every > record from both side, we need to query the state from the other side, this > will lead to poor performance when the state size if huge. So, in production, > we always need to spend a lot slots to handle stream join. But, indeed, we > can improve this in somehow, there a phenomenon of stream join can be found > in production. That's the “joined ratio” of the stream join is often very > low, for example. > - stream join in promotion analysis: Job need to join the promotion log with > the action(click, view, buy) log with the promotion_id to analysis the effect > of the promotion. > - stream join in AD(advertising) attribution: Job need to join the AD click > log with the item payment log on the click_id to find which click of which AD > that brings the payment to do attribution. > - stream join in click log analysis of doc: Job need to join viewed log(doc > viewed by users) with the click log (doc clicked by users) to analysis the > reason of the click and the property of the users. > - ….so on > All these cases have one common property, that is the joined ratio is very > low. Here is a example to describe it, we have 1 records from the left > stream, and 1 records from the right stream, and we execute select * > from leftStream l join rightStream r on l.id = r.id , we only got 100 record > from the result, that is the case for low joined ratio, this is an example > for inner join, but it can also apply to left & right join. > there are more example I can come up with low joined ratio…but the point I > want to raise up is that the low joined ratio of stream join in production is > a very common phenomenon(maybe even the almost common phenomenon in some > companies, at least in our company that is the case). > *How to improve this?* > We can see from the above case, 1 record join 1 record and we only > got 100 result, that means, we query the state 2 times (1 for the > left stream and 1 for the right stream) but only 100 of them are > meaningful!!! If we could reduce the useless query times, then we can > definitely improve the performance of stream join. > the way we used to improve this is to introduce the Runtime Filter Join, the > mainly ideal is that, we build a filter for the state on each side (left > stream & right stream). When we need to query the state on that side we first > check the corresponding filter whether the key is possible in the state, if > the filter say "not, it impossible in the State", then we stop querying the > state, if it say "hmm, it maybe in state", then we need to query the state. > As you can see, the best choose of the filter is Bloom Filter, it has all the > feature that we want: extremely good performance, non-existence of false > negative. > The simplest pseudo code for Runtime Filter Join(the comments is based on > RocksDBBackend) > {code:java} > void performJoinNormally(Record recordFromLeftStream) { > Iterator rightIterator = rigthStreamState.iterator(); > // perform the `seek()` on the RocksDB, and iterator one by one, > // this is an expensive operation especially when the key can't be > found in RocksDB. >
[jira] [Closed] (FLINK-8602) Improve recovery performance for rocksdb backend
[ https://issues.apache.org/jira/browse/FLINK-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou closed FLINK-8602. - Resolution: Done All sub-tasks are done. > Improve recovery performance for rocksdb backend > > > Key: FLINK-8602 > URL: https://issues.apache.org/jira/browse/FLINK-8602 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > Right now, RocksDB backend supports {{fully checkpoint}} & {{incremental > checkpoint}}. This issue try to improve the performance of recovery from > {{fully checkpoint}} as well as from {{incremental checkpoint}}. It contains > 2 sub-tasks. > 1. improve recovery performance for incremental checkpoint when > {{hasExtraKeys = true}} > 2. introduce `parallel recovery` from both {{fully checkpoint}} and > {{incremental checkpoint}} (Base on {{ingestExternalFile()}} and > {{SstFileWriter}} provided by RocksDB). > Any advice would be highly appreciated! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9043) Introduce a friendly way to resume the job from externalized checkpoints automatically
[ https://issues.apache.org/jira/browse/FLINK-9043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501312#comment-16501312 ] Sihua Zhou commented on FLINK-9043: --- Okay, I see it is quite tricky to provide a completely automatic way, especially for s3...then how about take a step back and firstly extends the current strategy to allow use provide the previous jobId, we search for the last successful checkpoint(auto skip the corrupt checkpoint) automatically? [~StephanEwen] what do you think? > Introduce a friendly way to resume the job from externalized checkpoints > automatically > -- > > Key: FLINK-9043 > URL: https://issues.apache.org/jira/browse/FLINK-9043 > Project: Flink > Issue Type: New Feature >Reporter: godfrey johnson >Assignee: Sihua Zhou >Priority: Major > > I know a flink job can reovery from checkpoint with restart strategy, but can > not recovery as spark streaming jobs when job is starting. > Every time, the submitted flink job is regarded as a new job, while , in the > spark streaming job, which can detect the checkpoint directory first, and > then recovery from the latest succeed one. However, Flink only can recovery > until the job failed first, then retry with strategy. > > So, would flink support to recover from the checkpoint directly in a new job? > h2. New description by [~sihuazhou] > Currently, it's quite a bit not friendly for users to recover job from the > externalized checkpoint, user need to find the dedicate dir for the job which > is not a easy thing when there are too many jobs. This ticket attend to > introduce a more friendly way to allow the user to use the externalized > checkpoint to do recovery. > The implementation steps are copied from the comments of [~StephanEwen]: > - We could make this an option where you pass a flag (-r) to automatically > look for the latest checkpoint in a given directory. > - If more than one jobs checkpointed there before, this operation would fail. > - We might also need a way to have jobs not create the UUID subdirectory, > otherwise the scanning for the latest checkpoint would not easily work. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499873#comment-16499873 ] Sihua Zhou commented on FLINK-9506: --- [~yow] I think RocksDB backend could give a more stable performance, but the peak performance may be reduced, anyway I think it worths a a try > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499867#comment-16499867 ] Sihua Zhou edited comment on FLINK-9506 at 6/4/18 8:05 AM: --- Thanks for trying it out. Then I think the performance drop and the fluctuation might be caused by the state lookup, and since you are using the KeyedStateBackend base on Heap, I think the fluctuation might caused by the capacity rescaling of the "Hash Map", but I think the impaction should not be that obvious... Maybe [~srichter] could give some more useful and professional information... was (Author: sihuazhou): Thanks for trying it out. Then I think the performance drop and the fluctuation might be caused by the state lookup, and since you are using the KeyedStateBackend base on Heap, I think the fluctuation might caused by the capacity rescale of the "Hash Map", but I think the impaction should not be that obvious... Maybe [~srichter] could give some more useful and professional information... > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499867#comment-16499867 ] Sihua Zhou commented on FLINK-9506: --- Thanks for trying it out. Then I think the performance drop and the fluctuation might be caused by the state lookup, and since you are using the KeyedStateBackend base on Heap, I think the fluctuation might caused by the capacity rescale of the "Hash Map", but I think the impaction should not be that obvious... Maybe [~srichter] could give some more useful and professional information... > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499672#comment-16499672 ] Sihua Zhou edited comment on FLINK-9506 at 6/4/18 2:22 AM: --- Hi [~yow] could you please just replace the getKey() as follow and give a try? {code:java} new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hash() % 128; } } {code} if this is work then I think the performance drop may cause by the state lookup. was (Author: sihuazhou): Hi [~yow] could you please just replace the getKey() as follow and give a try? {code} new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hash() / 128; } } {code} if this is work then I think the performance drop may cause by the state lookup. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499672#comment-16499672 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] could you please just replace the getKey() as follow and give a try? {code} new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hash() / 128; } } {code} if this is work then I think the performance drop may cause by the state lookup. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499395#comment-16499395 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] could you please give some information of the `keyBy()`? e.g. what are you keyed by in keyBy()? Is it also a POJO that with 50 string member or something others? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9486) Introduce TimerState in keyed state backend
[ https://issues.apache.org/jira/browse/FLINK-9486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16497956#comment-16497956 ] Sihua Zhou commented on FLINK-9486: --- [~srichter] Thanks for your reply, looking forward your PR! ;) > Introduce TimerState in keyed state backend > --- > > Key: FLINK-9486 > URL: https://issues.apache.org/jira/browse/FLINK-9486 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > This is the first implementation subtask. > Goal of this PR is to introduce a timer state that is registered with the > keyed state backend, similar to other forms of keyed state. > For the {{HeapKeyedStateBackend}}, this state lives on the same level as the > {{StateTable}} that hold other forms of keyed state, and the implementation > is basically backed by {{InternalTimerHeap}}. > For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this > state, outside of RocksDB and based upon {{InternalTimerHeap}}. This is an > intermediate state, and we will later also implement the alternative to store > the timers inside a column families in RocksDB. However, by taking this step, > we could also still offer the option to have RocksDB state with heap-based > timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9486) Introduce TimerState in keyed state backend
[ https://issues.apache.org/jira/browse/FLINK-9486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16497940#comment-16497940 ] Sihua Zhou commented on FLINK-9486: --- Hi [~srichter] Do you already work on this? If not, I'd like to take this ticket if you don't mind? > Introduce TimerState in keyed state backend > --- > > Key: FLINK-9486 > URL: https://issues.apache.org/jira/browse/FLINK-9486 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > This is the first implementation subtask. > Goal of this PR is to introduce a timer state that is registered with the > keyed state backend, similar to other forms of keyed state. > For the {{HeapKeyedStateBackend}}, this state lives on the same level as the > {{StateTable}} that hold other forms of keyed state, and the implementation > is basically backed by {{InternalTimerHeap}}. > For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this > state, outside of RocksDB and based upon {{InternalTimerHeap}}. This is an > intermediate state, and we will later also implement the alternative to store > the timers inside a column families in RocksDB. However, by taking this step, > we could also still offer the option to have RocksDB state with heap-based > timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9468) get outputLimit of LimitedConnectionsFileSystem incorrectly
[ https://issues.apache.org/jira/browse/FLINK-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9468: -- Priority: Critical (was: Blocker) > get outputLimit of LimitedConnectionsFileSystem incorrectly > --- > > Key: FLINK-9468 > URL: https://issues.apache.org/jira/browse/FLINK-9468 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > In {{LimitedConnectionsFileSystem#createStream}}, we get the outputLimit > incorrectly. > {code:java} > private T createStream( > final SupplierWithException streamOpener, > final HashSet openStreams, > final boolean output) throws IOException { > final int outputLimit = output && maxNumOpenInputStreams > 0 ? > maxNumOpenOutputStreams : Integer.MAX_VALUE; > /**/ > } > {code} > should be > {code:java} > private T createStream( > final SupplierWithException streamOpener, > final HashSet openStreams, > final boolean output) throws IOException { > final int outputLimit = output && maxNumOpenOutputStreams > 0 ? > maxNumOpenOutputStreams : Integer.MAX_VALUE; > /**/ > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9480) Let local recovery support rescaling
[ https://issues.apache.org/jira/browse/FLINK-9480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494908#comment-16494908 ] Sihua Zhou commented on FLINK-9480: --- [~srichter] Thanks for your reply, the reason and the use case I want to improve this is because of the the online rescaling feature of 1.5. Currently, it works as follow: - trigger a savepoint - rescaling from the savepoint. In order to let the online rescaling take advantage of local recovery, we need the local recovery to support rescaling, maybe it's not so strict that all node can only restore locally, but just a best effect, if some node can't find the local state it still can load data from remote. Yes, I agree that this feature's priority is lower than "timer service" and "ttl state" and I just create it in case that we may want to do it in the future... > Let local recovery support rescaling > > > Key: FLINK-9480 > URL: https://issues.apache.org/jira/browse/FLINK-9480 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Priority: Major > > Currently, local recovery only support restore from checkpoint and without > rescaling. Maybe we should enable it to support rescaling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9480) Let local recovery support rescaling
[ https://issues.apache.org/jira/browse/FLINK-9480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9480: -- Affects Version/s: 1.5.0 > Let local recovery support rescaling > > > Key: FLINK-9480 > URL: https://issues.apache.org/jira/browse/FLINK-9480 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Priority: Major > > Currently, local recovery only support restore from checkpoint and without > rescaling. Maybe we should enable it to support rescaling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9480) Let local recovery support rescaling
[ https://issues.apache.org/jira/browse/FLINK-9480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9480: -- Component/s: State Backends, Checkpointing > Let local recovery support rescaling > > > Key: FLINK-9480 > URL: https://issues.apache.org/jira/browse/FLINK-9480 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Priority: Major > > Currently, local recovery only support restore from checkpoint and without > rescaling. Maybe we should enable it to support rescaling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9480) Let local recovery support rescaling
[ https://issues.apache.org/jira/browse/FLINK-9480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494880#comment-16494880 ] Sihua Zhou commented on FLINK-9480: --- [~stefanrichte...@gmail.com] What do you think of this? > Let local recovery support rescaling > > > Key: FLINK-9480 > URL: https://issues.apache.org/jira/browse/FLINK-9480 > Project: Flink > Issue Type: Improvement >Reporter: Sihua Zhou >Priority: Major > > Currently, local recovery only support restore from checkpoint and without > rescaling. Maybe we should enable it to support rescaling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9480) Let local recovery support rescaling
Sihua Zhou created FLINK-9480: - Summary: Let local recovery support rescaling Key: FLINK-9480 URL: https://issues.apache.org/jira/browse/FLINK-9480 Project: Flink Issue Type: Improvement Reporter: Sihua Zhou Currently, local recovery only support restore from checkpoint and without rescaling. Maybe we should enable it to support rescaling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9479) Let the rescale API to use local recovery
Sihua Zhou created FLINK-9479: - Summary: Let the rescale API to use local recovery Key: FLINK-9479 URL: https://issues.apache.org/jira/browse/FLINK-9479 Project: Flink Issue Type: Improvement Components: REST, State Backends, Checkpointing Affects Versions: 1.5.0 Reporter: Sihua Zhou Currently, flink's online rescale api operates as the follow: - trigger savepoint for the job - rescaling the job from the savepoint We should improve it to use the local recovery to speed up it and reduce the network pressure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9475) introduce an approximate version of "select distinct"
Sihua Zhou created FLINK-9475: - Summary: introduce an approximate version of "select distinct" Key: FLINK-9475 URL: https://issues.apache.org/jira/browse/FLINK-9475 Project: Flink Issue Type: New Feature Components: Table API SQL Affects Versions: 1.5.0 Reporter: Sihua Zhou Assignee: Sihua Zhou Base on the "Elastic Bloom Filter", it easy to implement an approximate version of "select distinct" that have an excellent performance. Its accuracy should be configurable, e.g. 95%, 98%. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9474) Introduce an approximate version of "count distinct"
Sihua Zhou created FLINK-9474: - Summary: Introduce an approximate version of "count distinct" Key: FLINK-9474 URL: https://issues.apache.org/jira/browse/FLINK-9474 Project: Flink Issue Type: New Feature Components: Table API SQL Affects Versions: 1.5.0 Reporter: Sihua Zhou Assignee: Sihua Zhou We can implement an approximate version of "count distinct" base on the "Elastic Bloom Filter", It could be very fast because we don't need to query the state anymore, its accuracy should could be configurable. e.g 95%, 98%. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9364) Add doc of the memory usage in flink
[ https://issues.apache.org/jira/browse/FLINK-9364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9364: -- Summary: Add doc of the memory usage in flink (was: Add doc for the memory usage in flink) > Add doc of the memory usage in flink > > > Key: FLINK-9364 > URL: https://issues.apache.org/jira/browse/FLINK-9364 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > We need to add a doc to describe the memory usage in flink, especially when > people use the RocksDBBackend, many people get confuse because of that (I've > saw serval question related to this on the user emails). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9468) get outputLimit of LimitedConnectionsFileSystem incorrectly
Sihua Zhou created FLINK-9468: - Summary: get outputLimit of LimitedConnectionsFileSystem incorrectly Key: FLINK-9468 URL: https://issues.apache.org/jira/browse/FLINK-9468 Project: Flink Issue Type: Bug Components: FileSystem Affects Versions: 1.5.0 Reporter: Sihua Zhou Assignee: Sihua Zhou Fix For: 1.6.0, 1.5.1 In {{LimitedConnectionsFileSystem#createStream}}, we get the outputLimit incorrectly. {code:java} private T createStream( final SupplierWithException streamOpener, final HashSet openStreams, final boolean output) throws IOException { final int outputLimit = output && maxNumOpenInputStreams > 0 ? maxNumOpenOutputStreams : Integer.MAX_VALUE; /**/ } {code} should be {code:java} private T createStream( final SupplierWithException streamOpener, final HashSet openStreams, final boolean output) throws IOException { final int outputLimit = output && maxNumOpenOutputStreams > 0 ? maxNumOpenOutputStreams : Integer.MAX_VALUE; /**/ } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9455: - Assignee: Sihua Zhou > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9456: - Assignee: Sihua Zhou > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8205) Multi key get
[ https://issues.apache.org/jira/browse/FLINK-8205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492731#comment-16492731 ] Sihua Zhou commented on FLINK-8205: --- Hi [~kkl0u] are you still willing to help with the design and the reviews for this ticket? > Multi key get > - > > Key: FLINK-8205 > URL: https://issues.apache.org/jira/browse/FLINK-8205 > Project: Flink > Issue Type: New Feature > Components: Queryable State >Affects Versions: 1.4.0 > Environment: Any >Reporter: Martin Eden >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Currently the Java queryable state api only allows for fetching one key at a > time. It would be extremely useful and more efficient if a similar call > exists for submitting multiple keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8601) Introduce ElasticBloomFilter for Approximate calculation and other situations of performance optimization
[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492600#comment-16492600 ] Sihua Zhou commented on FLINK-8601: --- Hi [~aljoscha] could you please have a look at this? I updated the doc to the latest version. > Introduce ElasticBloomFilter for Approximate calculation and other situations > of performance optimization > - > > Key: FLINK-8601 > URL: https://issues.apache.org/jira/browse/FLINK-8601 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > h2. *Motivation* > There are some scenarios drive us to introduce this ElasticBloomFilter, one > is Stream Join, another is Data Deduplication, and some special user > cases...This has given us a great experience, for example, we implemented > the Runtime Filter Join base on it, and it gives us a great performance > improvement. With this feature, It diff us from the "normal stream join", > allows us to improve performance while reducing resource consumption by about > half!!! > I will list the two most typical user cases that optimized by the > ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data > Dedeplication" in brief. > *Scenario 1: Runtime Filter Join* > In general, stream join is one of the most performance cost task. For every > record from both side, we need to query the state from the other side, this > will lead to poor performance when the state size if huge. So, in production, > we always need to spend a lot slots to handle stream join. But, indeed, we > can improve this in somehow, there a phenomenon of stream join can be found > in production. That's the “joined ratio” of the stream join is often very > low, for example. > - stream join in promotion analysis: Job need to join the promotion log with > the action(click, view, buy) log with the promotion_id to analysis the effect > of the promotion. > - stream join in AD(advertising) attribution: Job need to join the AD click > log with the item payment log on the click_id to find which click of which AD > that brings the payment to do attribution. > - stream join in click log analysis of doc: Job need to join viewed log(doc > viewed by users) with the click log (doc clicked by users) to analysis the > reason of the click and the property of the users. > - ….so on > All these cases have one common property, that is the joined ratio is very > low. Here is a example to describe it, we have 1 records from the left > stream, and 1 records from the right stream, and we execute select * > from leftStream l join rightStream r on l.id = r.id , we only got 100 record > from the result, that is the case for low joined ratio, this is an example > for inner join, but it can also apply to left & right join. > there are more example I can come up with low joined ratio…but the point I > want to raise up is that the low joined ratio of stream join in production is > a very common phenomenon(maybe even the almost common phenomenon in some > companies, at least in our company that is the case). > *How to improve this?* > We can see from the above case, 1 record join 1 record and we only > got 100 result, that means, we query the state 2 times (1 for the > left stream and 1 for the right stream) but only 100 of them are > meaningful!!! If we could reduce the useless query times, then we can > definitely improve the performance of stream join. > the way we used to improve this is to introduce the Runtime Filter Join, the > mainly ideal is that, we build a filter for the state on each side (left > stream & right stream). When we need to query the state on that side we first > check the corresponding filter whether the key is possible in the state, if > the filter say "not, it impossible in the State", then we stop querying the > state, if it say "hmm, it maybe in state", then we need to query the state. > As you can see, the best choose of the filter is Bloom Filter, it has all the > feature that we want: extremely good performance, non-existence of false > negative. > The simplest pseudo code for Runtime Filter Join(the comments is based on > RocksDBBackend) > {code:java} > void performJoinNormally(Record recordFromLeftStream) { > Iterator rightIterator = rigthStreamState.iterator(); > // perform the `seek()` on the RocksDB, and iterator one by one, > // this is an expensive operation especially when the key can't be > found in RocksDB. > for (Record recordFromRightState : rightIterator) { > ……... > } > } > > void
[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9410: - Assignee: mingleizhang (was: Sihua Zhou) > Replace NMClient with NMClientAsync in YarnResourceManager > -- > > Key: FLINK-9410 > URL: https://issues.apache.org/jira/browse/FLINK-9410 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Critical > Fix For: 1.6.0 > > > Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} > which is called from within the main thread of the {{ResourceManager}}. Since > these operations are blocking, we should replace the client with the > {{NMClientAsync}} and make the calls non blocking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9351) RM stop assigning slot to Job because the TM killed before connecting to JM successfully
[ https://issues.apache.org/jira/browse/FLINK-9351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9351: - Assignee: Sihua Zhou > RM stop assigning slot to Job because the TM killed before connecting to JM > successfully > > > Key: FLINK-9351 > URL: https://issues.apache.org/jira/browse/FLINK-9351 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Fix For: 1.6.0 > > > The steps are the following(copied from Stephan's comments in > [5931|https://github.com/apache/flink/pull/5931]): > - JobMaster / SlotPool requests a slot (AllocationID) from the ResourceManager > - ResourceManager starts a container with a TaskManager > - TaskManager registers at ResourceManager, which tells the TaskManager to > push a slot to the JobManager. > - TaskManager container is killed > - The ResourceManager does not queue back the slot requests (AllocationIDs) > that it sent to the previous TaskManager, so the requests are lost and need > to time out before another attempt is tried. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9426) Harden RocksDBWriteBatchPerformanceTest.benchMark()
[ https://issues.apache.org/jira/browse/FLINK-9426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487276#comment-16487276 ] Sihua Zhou commented on FLINK-9426: --- one instance: https://travis-ci.org/apache/flink/jobs/382584476 > Harden RocksDBWriteBatchPerformanceTest.benchMark() > --- > > Key: FLINK-9426 > URL: https://issues.apache.org/jira/browse/FLINK-9426 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0, 1.5.1 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > We use the assert to check the performance of WriteBatch is better than > Put(), this should be true in general, but in sometimes this could also be > false. We may need to follow the other tests under the > *org.apache.flink.contrib.streaming.state.benchmark.**, only use the timeout > property to valid the test. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9426) Harden RocksDBWriteBatchPerformanceTest.benchMark()
Sihua Zhou created FLINK-9426: - Summary: Harden RocksDBWriteBatchPerformanceTest.benchMark() Key: FLINK-9426 URL: https://issues.apache.org/jira/browse/FLINK-9426 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.6.0, 1.5.1 Reporter: Sihua Zhou Assignee: Sihua Zhou Fix For: 1.6.0 We use the assert to check the performance of WriteBatch is better than Put(), this should be true in general, but in sometimes this could also be false. We may need to follow the other tests under the *org.apache.flink.contrib.streaming.state.benchmark.**, only use the timeout property to valid the test. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9417) Send heartbeat requests from RPC endpoint's main thread
[ https://issues.apache.org/jira/browse/FLINK-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9417: - Assignee: Sihua Zhou > Send heartbeat requests from RPC endpoint's main thread > --- > > Key: FLINK-9417 > URL: https://issues.apache.org/jira/browse/FLINK-9417 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > > Currently, we use the {{RpcService#scheduledExecutor}} to send heartbeat > requests to remote targets. This has the problem that we still see heartbeats > from this endpoint also if its main thread is currently blocked. Due to this, > the heartbeat response cannot be processed and the remote target times out. > On the remote side, this won't be noticed because it still receives the > heartbeat requests. > A solution to this problem would be to send the heartbeat requests to the > remote thread through the RPC endpoint's main thread. That way, also the > heartbeats would be blocked if the main thread is blocked/busy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8601) Introduce ElasticBloomFilter for Approximate calculation and other situations of performance optimization
[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-8601: -- Description: h2. *Motivation* There are some scenarios drive us to introduce this ElasticBloomFilter, one is Stream Join, another is Data Deduplication, and some special user cases...This has given us a great experience, for example, we implemented the Runtime Filter Join base on it, and it gives us a great performance improvement. With this feature, It diff us from the "normal stream join", allows us to improve performance while reducing resource consumption by about half!!! I will list the two most typical user cases that optimized by the ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data Dedeplication" in brief. *Scenario 1: Runtime Filter Join* In general, stream join is one of the most performance cost task. For every record from both side, we need to query the state from the other side, this will lead to poor performance when the state size if huge. So, in production, we always need to spend a lot slots to handle stream join. But, indeed, we can improve this in somehow, there a phenomenon of stream join can be found in production. That's the “joined ratio” of the stream join is often very low, for example. - stream join in promotion analysis: Job need to join the promotion log with the action(click, view, buy) log with the promotion_id to analysis the effect of the promotion. - stream join in AD(advertising) attribution: Job need to join the AD click log with the item payment log on the click_id to find which click of which AD that brings the payment to do attribution. - stream join in click log analysis of doc: Job need to join viewed log(doc viewed by users) with the click log (doc clicked by users) to analysis the reason of the click and the property of the users. - ….so on All these cases have one common property, that is the joined ratio is very low. Here is a example to describe it, we have 1 records from the left stream, and 1 records from the right stream, and we execute select * from leftStream l join rightStream r on l.id = r.id , we only got 100 record from the result, that is the case for low joined ratio, this is an example for inner join, but it can also apply to left & right join. there are more example I can come up with low joined ratio…but the point I want to raise up is that the low joined ratio of stream join in production is a very common phenomenon(maybe even the almost common phenomenon in some companies, at least in our company that is the case). *How to improve this?* We can see from the above case, 1 record join 1 record and we only got 100 result, that means, we query the state 2 times (1 for the left stream and 1 for the right stream) but only 100 of them are meaningful!!! If we could reduce the useless query times, then we can definitely improve the performance of stream join. the way we used to improve this is to introduce the Runtime Filter Join, the mainly ideal is that, we build a filter for the state on each side (left stream & right stream). When we need to query the state on that side we first check the corresponding filter whether the key is possible in the state, if the filter say "not, it impossible in the State", then we stop querying the state, if it say "hmm, it maybe in state", then we need to query the state. As you can see, the best choose of the filter is Bloom Filter, it has all the feature that we want: extremely good performance, non-existence of false negative. The simplest pseudo code for Runtime Filter Join(the comments is based on RocksDBBackend) {code:java} void performJoinNormally(Record recordFromLeftStream) { Iterator rightIterator = rigthStreamState.iterator(); // perform the `seek()` on the RocksDB, and iterator one by one, // this is an expensive operation especially when the key can't be found in RocksDB. for (Record recordFromRightState : rightIterator) { ……... } } void performRuntimeFilterJoin(Record recordFromLeftStream) { Iterator rightIterator = EMPTY_ITERATOR; if (rigthStreamfilter.containsCurrentKey()) { rightIterator = rigthStreamState.iterator(); } // perform the `seek()` only when filter.containsCurrentKey() return true for (Record recordFromRightState : rightIterator) { ... } // add the current key into the filter of left stream. leftStreamFilter.addCurrentKey(); } {code} *Scenario 2: Data Deduplication* We have implemented two general functions based on the ElasticBloomFilter. They are count(distinct x) and select distinct x, y, z from table. Unlike the Runtime Filter Join the result of this two functions is approximate, not exactly. There are used in the scenario where we don't need a 100% accurate
[jira] [Updated] (FLINK-8601) Introduce ElasticBloomFilter for Approximate calculation and other situations of performance optimization
[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-8601: -- Description: h2. *Motivation* There are some scenarios drive us to introduce this ElasticBloomFilter, one is Stream Join, another is Data Deduplication, and some special user cases...This has given us a great experience, for example, we implemented the Runtime Filter Join base on it, and it gives us a great performance improvement. With this feature, It diff us from the "normal stream join", allows us to improve performance while reducing resource consumption by about half!!! I will list the two most typical user cases that optimized by the ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data Dedeplication" in brief. *Scenario 1: Runtime Filter Join* In general, stream join is one of the most performance cost task. For every record from both side, we need to query the state from the other side, this will lead to poor performance when the state size if huge. So, in production, we always need to spend a lot slots to handle stream join. But, indeed, we can improve this in somehow, there a phenomenon of stream join can be found in production. That's the “joined ratio” of the stream join is often very low, for example. - stream join in promotion analysis: Job need to join the promotion log with the action(click, view, buy) log with the promotion_id to analysis the effect of the promotion. - stream join in AD(advertising) attribution: Job need to join the AD click log with the item payment log on the click_id to find which click of which AD that brings the payment to do attribution. - stream join in click log analysis of doc: Job need to join viewed log(doc viewed by users) with the click log (doc clicked by users) to analysis the reason of the click and the property of the users. - ….so on All these cases have one common property, that is the joined ratio is very low. Here is a example to describe it, we have 1 records from the left stream, and 1 records from the right stream, and we execute select * from leftStream l join rightStream r on l.id = r.id , we only got 100 record from the result, that is the case for low joined ratio, this is an example for inner join, but it can also apply to left & right join. there are more example I can come up with low joined ratio…but the point I want to raise up is that the low joined ratio of stream join in production is a very common phenomenon(maybe even the almost common phenomenon in some companies, at least in our company that is the case). *How to improve this?* We can see from the above case, 1 record join 1 record and we only got 100 result, that means, we query the state 2 times (1 for the left stream and 1 for the right stream) but only 100 of them are meaningful!!! If we could reduce the useless query times, then we can definitely improve the performance of stream join. the way we used to improve this is to introduce the Runtime Filter Join, the mainly ideal is that, we build a filter for the state on each side (left stream & right stream). When we need to query the state on that side we first check the corresponding filter whether the key is possible in the state, if the filter say "not, it impossible in the State", then we stop querying the state, if it say "hmm, it maybe in state", then we need to query the state. As you can see, the best choose of the filter is Bloom Filter, it has all the feature that we want: extremely good performance, non-existence of false negative. The simplest pseudo code for Runtime Filter Join(the comments is based on RocksDBBackend) {code:java} void performJoinNormally(Record recordFromLeftStream) { Iterator rightIterator = rigthStreamState.iterator(); // perform the `seek()` on the RocksDB, and iterator one by one, // this is an expensive operation especially when the key can't be found in RocksDB. for (Record recordFromRightState : rightIterator) { ……... } } void performRuntimeFilterJoin(Record recordFromLeftStream) { Iterator rightIterator = EMPTY_ITERATOR; if (rigthStreamfilter.containsCurrentKey()) { rightIterator = rigthStreamState.iterator(); } // perform the `seek()` only when filter.containsCurrentKey() return true for (Record recordFromRightState : rightIterator) { ... } // add the current key into the filter of left stream. leftStreamFilter.addCurrentKey(); } {code} *Scenario 2: Data Deduplication* We have implemented two general functions based on the ElasticBloomFilter. They are count(distinct x) and select distinct x, y, z from table. Unlike the Runtime Filter Join the result of this two functions is approximate, not exactly. There are used in the scenario where we don't need a 100% accurate
[jira] [Updated] (FLINK-9070) Improve performance of RocksDBMapState.clear()
[ https://issues.apache.org/jira/browse/FLINK-9070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9070: -- Affects Version/s: (was: 1.6.0) 1.5.0 > Improve performance of RocksDBMapState.clear() > -- > > Key: FLINK-9070 > URL: https://issues.apache.org/jira/browse/FLINK-9070 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Truong Duc Kien >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > Currently, RocksDBMapState.clear() is implemented by iterating over all the > keys and drop them one by one. This iteration can be quite slow with: > * Large maps > * High-churn maps with a lot of tombstones > There are a few methods to speed-up deletion for a range of keys, each with > their own caveats: > * DeleteRange: still experimental, likely buggy > * DeleteFilesInRange + CompactRange: only good for large ranges > > Flink can also keep a list of inserted keys in-memory, then directly delete > them without having to iterate over the Rocksdb database again. > > Reference: > * [RocksDB article about range > deletion|https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys] > * [Bug in DeleteRange|https://pingcap.com/blog/2017-09-08-rocksdbbug] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9070) Improve performance of RocksDBMapState.clear()
[ https://issues.apache.org/jira/browse/FLINK-9070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9070: -- Fix Version/s: (was: 1.6.0) 1.5.0 > Improve performance of RocksDBMapState.clear() > -- > > Key: FLINK-9070 > URL: https://issues.apache.org/jira/browse/FLINK-9070 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Truong Duc Kien >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > Currently, RocksDBMapState.clear() is implemented by iterating over all the > keys and drop them one by one. This iteration can be quite slow with: > * Large maps > * High-churn maps with a lot of tombstones > There are a few methods to speed-up deletion for a range of keys, each with > their own caveats: > * DeleteRange: still experimental, likely buggy > * DeleteFilesInRange + CompactRange: only good for large ranges > > Flink can also keep a list of inserted keys in-memory, then directly delete > them without having to iterate over the Rocksdb database again. > > Reference: > * [RocksDB article about range > deletion|https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys] > * [Bug in DeleteRange|https://pingcap.com/blog/2017-09-08-rocksdbbug] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9410: - Assignee: Sihua Zhou > Replace NMClient with NMClientAsync in YarnResourceManager > -- > > Key: FLINK-9410 > URL: https://issues.apache.org/jira/browse/FLINK-9410 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Critical > Fix For: 1.6.0 > > > Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} > which is called from within the main thread of the {{ResourceManager}}. Since > these operations are blocking, we should replace the client with the > {{NMClientAsync}} and make the calls non blocking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9401) Data lost when rescaling the job from incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou closed FLINK-9401. - Resolution: Invalid > Data lost when rescaling the job from incremental checkpoint > > > Key: FLINK-9401 > URL: https://issues.apache.org/jira/browse/FLINK-9401 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0, 1.4.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > > We may lost data when rescaling job from incremental checkpoint because of > the following code. > {code:java} > try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, > columnFamilyHandle)) { >int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); >byte[] startKeyGroupPrefixBytes = new > byte[stateBackend.keyGroupPrefixBytes]; >for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { > startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> > ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); >} >iterator.seek(startKeyGroupPrefixBytes); >while (iterator.isValid()) { > int keyGroup = 0; > for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { > keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; > } > if (stateBackend.keyGroupRange.contains(keyGroup)) { > stateBackend.db.put(targetColumnFamilyHandle, > iterator.key(), iterator.value()); > } > iterator.next(); >} > } > {code} > For every state handle to fetch the target data, we > _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be > INVALID immediately if the state handle's _start key group_ is bigger that > _state.keyGroupRange.getStartKeyGroup()_. Then, data lost... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9401) Data lost when rescaling the job from incremental checkpoint
Sihua Zhou created FLINK-9401: - Summary: Data lost when rescaling the job from incremental checkpoint Key: FLINK-9401 URL: https://issues.apache.org/jira/browse/FLINK-9401 Project: Flink Issue Type: Bug Affects Versions: 1.4.2, 1.5.0 Reporter: Sihua Zhou Assignee: Sihua Zhou We may lost data when rescaling job from incremental checkpoint because of the following code. {code:java} try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) { int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); } iterator.seek(startKeyGroupPrefixBytes); while (iterator.isValid()) { int keyGroup = 0; for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; } if (stateBackend.keyGroupRange.contains(keyGroup)) { stateBackend.db.put(targetColumnFamilyHandle, iterator.key(), iterator.value()); } iterator.next(); } } {code} For every state handle to fetch the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID immediately if the state handle's _start key group_ is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9375) Introduce AbortCheckpoint message from JM to TMs
[ https://issues.apache.org/jira/browse/FLINK-9375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480041#comment-16480041 ] Sihua Zhou commented on FLINK-9375: --- Hi [~srichter][~yanghua]I think this is a bit looks like a duplicate of this [FLINK-8871|https://issues.apache.org/jira/browse/FLINK-8871] which needs a good discussion as you([~srichter]) have mentioned, or [FLINK-8871|https://issues.apache.org/jira/browse/FLINK-8871] should be blocked by this ticket (which only finish the RPC related works)? Maybe we should connect these two guys together to get a better picture...what do you think? > Introduce AbortCheckpoint message from JM to TMs > > > Key: FLINK-9375 > URL: https://issues.apache.org/jira/browse/FLINK-9375 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: vinoyang >Priority: Major > > We should introduce an {{AbortCheckpoint}} message that a jobmanager can send > to taskmanagers if a checkpoint is canceled so that the operators can eagerly > stop their alignment phase and continue to normal processing. This can reduce > some backpressure issues in the context of canceled and restarted checkpoints. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9373) Fix potential data losing for RocksDBBackend
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16479462#comment-16479462 ] Sihua Zhou commented on FLINK-9373: --- [~srichter] FYI [3558|https://github.com/facebook/rocksdb/issues/3558], got reply from RocksDB. I think we chosen the right way that should go ;), cause the status could be reset. > Fix potential data losing for RocksDBBackend > > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9373) Fix potential data losing for RocksDBBackend
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478865#comment-16478865 ] Sihua Zhou commented on FLINK-9373: --- I will updated the PR quickly. > Fix potential data losing for RocksDBBackend > > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9373) Fix potential data losing for RocksDBBackend
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478848#comment-16478848 ] Sihua Zhou commented on FLINK-9373: --- I think that makes sense. > Fix potential data losing for RocksDBBackend > > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8918) Introduce Runtime Filter Join
[ https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477433#comment-16477433 ] Sihua Zhou commented on FLINK-8918: --- [~fhueske] related to the saturated problem, I want to add some more informations. In ElasticBloomFilter, bloom filter are allocated lazily, and can be scalable. But yes, we should discuss this more deeply on FLINK-8601. > Introduce Runtime Filter Join > - > > Key: FLINK-8918 > URL: https://issues.apache.org/jira/browse/FLINK-8918 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > In general, stream join is one of the most performance cost task. For every > record from both side, we need to query the state from the other side, this > will lead to poor performance when the state size if huge. So, in production, > we always need to spend a lot slots to handle stream join. But, indeed, we > can improve this in somehow, there a phenomenon of stream join can be found > in production. That's the `joined ratio` of the stream join is often very > low, for example. > - stream join in promotion analysis: Job need to join the promotion log with > the action(click, view, payment, collection, retweet) log with the > `promotion_id` to analysis the effect of the promotion. > - stream join in AD(advertising) attribution: Job need to join the AD click > log with the item payment log on the `click_id` to find which click of which > AD that brings the payment to do attribution. > - stream join in click log analysis of doc: Job need to join viewed log(doc > viewed by users) with the click log (doc clicked by users) to analysis the > reason of the click and the property of the users. > - ….so on > All these cases have one common property, that is the _joined ratio_ is very > low. Here is a example to describe it, imagine that, we have 1 records > from the left stream, and 1 records from the right stream, and we execute > _select * from leftStream l join rightStream r on l.id = r.id_ , we only got > 100 record from the result, that is the case for low _joined ratio_, this is > an example for inner join, but it can also apply to left & right join. > there are more example I can come up with low _joined ratio_ , but the most > important point I want to expressed is that, the low _joined ratio_ of stream > join in production is a very common phenomenon(maybe the almost common > phenomenon in some companies, at least in our company that is the case). > *Then how to improve it?* > We can see from the above case, 1 record join 1 record we only got > 100 result, that means, we query the state 2 times (1 for the left > stream and 1 for the right stream) but only 100 of them are meaningful!!! > If we could reduce the useless query times, then we can definitely improve > the performance of stream join. > the way we used to improve this is to introduce the _Runtime Filter Join_, > the mainly ideal is that, we build a _filter_ for the state on each side > (left stream & right stream). When we need to query the state on that side we > first check the corresponding _filter_ whether the _key_ is possible in the > state, if the _filter_ say "not, it impossible in the state", then we stop > querying the state, if it say "hmm, it maybe in state", then we need to query > the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, > it has all the feature that we expected: _extremely good performance_, > _non-existence of false negative_. > > *the simplest pseudo code for _Runtime Filter Join_(the comments inline are > based on RocksDBBackend)* > {code:java} > void performJoinNormally(Record recordFromLeftStream) { > Iterator rightIterator = rigthStreamState.iterator(); > // perform the `seek()` on the RocksDB, and iterator one by one, > // this is an expensive operation especially when the key can't be found > in RocksDB. > for (Record recordFromRightState : rightIterator) { > ... > } > } > void performRuntimeFilterJoin(Record recordFromLeftStream) { > Iterator rightIterator = EMPTY_ITERATOR; > if (rigthStreamfilter.containsCurrentKey()) { > rightIterator = rigthStreamState.iterator(); > } > // perform the `seek()` only when filter.containsCurrentKey() return true > for (Record recordFromRightState : rightIterator) { > ... > } > > // add the current key into the filter of left stream. > leftStreamFilter.addCurrentKey(); > } > {code} > A description of Runtime Filter Join for batch join can be found > [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] >
[jira] [Commented] (FLINK-8918) Introduce Runtime Filter Join
[ https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477429#comment-16477429 ] Sihua Zhou commented on FLINK-8918: --- Hi [~fhueske], sorry that I'm definitely not asked you to promise to take this into 1.6 indeed...I just need to know that the community won't object this feature obviously before I'm starting on it. I agree with you that we should get a discussion about [FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] firstly, I will appreciate it very much if you could join the discussion! Thanks! Best, Sihua > Introduce Runtime Filter Join > - > > Key: FLINK-8918 > URL: https://issues.apache.org/jira/browse/FLINK-8918 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > In general, stream join is one of the most performance cost task. For every > record from both side, we need to query the state from the other side, this > will lead to poor performance when the state size if huge. So, in production, > we always need to spend a lot slots to handle stream join. But, indeed, we > can improve this in somehow, there a phenomenon of stream join can be found > in production. That's the `joined ratio` of the stream join is often very > low, for example. > - stream join in promotion analysis: Job need to join the promotion log with > the action(click, view, payment, collection, retweet) log with the > `promotion_id` to analysis the effect of the promotion. > - stream join in AD(advertising) attribution: Job need to join the AD click > log with the item payment log on the `click_id` to find which click of which > AD that brings the payment to do attribution. > - stream join in click log analysis of doc: Job need to join viewed log(doc > viewed by users) with the click log (doc clicked by users) to analysis the > reason of the click and the property of the users. > - ….so on > All these cases have one common property, that is the _joined ratio_ is very > low. Here is a example to describe it, imagine that, we have 1 records > from the left stream, and 1 records from the right stream, and we execute > _select * from leftStream l join rightStream r on l.id = r.id_ , we only got > 100 record from the result, that is the case for low _joined ratio_, this is > an example for inner join, but it can also apply to left & right join. > there are more example I can come up with low _joined ratio_ , but the most > important point I want to expressed is that, the low _joined ratio_ of stream > join in production is a very common phenomenon(maybe the almost common > phenomenon in some companies, at least in our company that is the case). > *Then how to improve it?* > We can see from the above case, 1 record join 1 record we only got > 100 result, that means, we query the state 2 times (1 for the left > stream and 1 for the right stream) but only 100 of them are meaningful!!! > If we could reduce the useless query times, then we can definitely improve > the performance of stream join. > the way we used to improve this is to introduce the _Runtime Filter Join_, > the mainly ideal is that, we build a _filter_ for the state on each side > (left stream & right stream). When we need to query the state on that side we > first check the corresponding _filter_ whether the _key_ is possible in the > state, if the _filter_ say "not, it impossible in the state", then we stop > querying the state, if it say "hmm, it maybe in state", then we need to query > the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, > it has all the feature that we expected: _extremely good performance_, > _non-existence of false negative_. > > *the simplest pseudo code for _Runtime Filter Join_(the comments inline are > based on RocksDBBackend)* > {code:java} > void performJoinNormally(Record recordFromLeftStream) { > Iterator rightIterator = rigthStreamState.iterator(); > // perform the `seek()` on the RocksDB, and iterator one by one, > // this is an expensive operation especially when the key can't be found > in RocksDB. > for (Record recordFromRightState : rightIterator) { > ... > } > } > void performRuntimeFilterJoin(Record recordFromLeftStream) { > Iterator rightIterator = EMPTY_ITERATOR; > if (rigthStreamfilter.containsCurrentKey()) { > rightIterator = rigthStreamState.iterator(); > } > // perform the `seek()` only when filter.containsCurrentKey() return true > for (Record recordFromRightState : rightIterator) { > ... > } > > // add the current key into the filter of left stream. >
[jira] [Comment Edited] (FLINK-8918) Introduce Runtime Filter Join
[ https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477373#comment-16477373 ] Sihua Zhou edited comment on FLINK-8918 at 5/16/18 12:49 PM: - Hi [~fhueske] thank you very much for your reply, you are right there are a few challenges with using a bloom filter to implement the Runtime Filter Join. I have created a separate JIRA [FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] (which this issue is blocked by) to address that, and in that ticket I propose to introduce a _ElasticBloomFilter_ which could handle the question that you mentioned above, there also a google doc that related to it, but it's a bit outdated now... I'm writing a new one and preparing to fire a DISCUSSION on dev mail once 1.5 is releasing out. Here, I'd like to answer your question one by one in a bref version, we can discussion it on detail on the dev mail later: > Bloom Filter Data Structure: it is not possible to remove a key from a bloom > filter Yes, to remove a record from the Bloom Filter is impossible, but we can somehow implement a relax TTL for the data, which could help to release the memory. > Checkpoint, Recovery, Rescaling The ElasticBloomFilter(as you've pointed out is scoped to key group) introduced in [FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] will support checkpoint & recovery & rescaling. More over, it even support to handle data skewed which is also a big problem when using the bloom filter in FLINK. Additional, with introducing that type of ElasticBloomFilter there are some other interesting things we can do upon it...What do you think? do you object this to go into 1.6? If not I going to fire a DISCUSSION on the dev mail once 1.5 is released out (as I mentioned about)... Best, Sihua was (Author: sihuazhou): Hi [~fhueske] thank you very much for your reply, you are right there are a few challenges with using a bloom filter to implement the Runtime Filter Join. I have created a separate JIRA [FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] (which this issue is blocked by) to address that, and in that ticket I propose to introduce a _ElasticBloomFilter_ which could handle the question that you mentioned above, there also a google doc that related to it, but it's a bit outdated now... I'm writing a new one and preparing to fire a DISCUSSION on dev mail once 1.5 is releasing out. Here, I'd like to answer your question one by one in a bref version, we can discussion it on detail on the dev mail later: > Bloom Filter Data Structure: it is not possible to remove a key from a bloom > filter Yes, to remove a record from the Bloom Filter is impossible, but we can somehow implement a relax TTL for the data, which could help to release the memory. > Checkpoint, Recovery, Rescaling The ElasticBloomFilter introduced in [FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] will support checkpoint & recovery & rescaling. More over, it even support to handle data skewed which is also a big problem when using the bloom filter in FLINK. Additional, with introducing that type of ElasticBloomFilter there are some other interesting things we can do upon it...What do you think? do you object this to go into 1.6? If not I going to fire a DISCUSSION on the dev mail once 1.5 is released out (as I mentioned about)... Best, Sihua > Introduce Runtime Filter Join > - > > Key: FLINK-8918 > URL: https://issues.apache.org/jira/browse/FLINK-8918 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > In general, stream join is one of the most performance cost task. For every > record from both side, we need to query the state from the other side, this > will lead to poor performance when the state size if huge. So, in production, > we always need to spend a lot slots to handle stream join. But, indeed, we > can improve this in somehow, there a phenomenon of stream join can be found > in production. That's the `joined ratio` of the stream join is often very > low, for example. > - stream join in promotion analysis: Job need to join the promotion log with > the action(click, view, payment, collection, retweet) log with the > `promotion_id` to analysis the effect of the promotion. > - stream join in AD(advertising) attribution: Job need to join the AD click > log with the item payment log on the `click_id` to find which click of which > AD that brings the payment to do attribution. > - stream join in click log analysis of doc: Job need to join viewed log(doc > viewed by users) with the click log (doc clicked by users) to analysis the > reason of the click and the
[jira] [Commented] (FLINK-8918) Introduce Runtime Filter Join
[ https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477373#comment-16477373 ] Sihua Zhou commented on FLINK-8918: --- Hi [~fhueske] thank you very much for your reply, you are right there are a few challenges with using a bloom filter to implement the Runtime Filter Join. I have created a separate JIRA [FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] (which this issue is blocked by) to address that, and in that ticket I propose to introduce a _ElasticBloomFilter_ which could handle the question that you mentioned above, there also a google doc that related to it, but it's a bit outdated now... I'm writing a new one and preparing to fire a DISCUSSION on dev mail once 1.5 is releasing out. Here, I'd like to answer your question one by one in a bref version, we can discussion it on detail on the dev mail later: > Bloom Filter Data Structure: it is not possible to remove a key from a bloom > filter Yes, to remove a record from the Bloom Filter is impossible, but we can somehow implement a relax TTL for the data, which could help to release the memory. > Checkpoint, Recovery, Rescaling The ElasticBloomFilter introduced in [FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] will support checkpoint & recovery & rescaling. More over, it even support to handle data skewed which is also a big problem when using the bloom filter in FLINK. Additional, with introducing that type of ElasticBloomFilter there are some other interesting things we can do upon it...What do you think? do you object this to go into 1.6? If not I going to fire a DISCUSSION on the dev mail once 1.5 is released out (as I mentioned about)... Best, Sihua > Introduce Runtime Filter Join > - > > Key: FLINK-8918 > URL: https://issues.apache.org/jira/browse/FLINK-8918 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > In general, stream join is one of the most performance cost task. For every > record from both side, we need to query the state from the other side, this > will lead to poor performance when the state size if huge. So, in production, > we always need to spend a lot slots to handle stream join. But, indeed, we > can improve this in somehow, there a phenomenon of stream join can be found > in production. That's the `joined ratio` of the stream join is often very > low, for example. > - stream join in promotion analysis: Job need to join the promotion log with > the action(click, view, payment, collection, retweet) log with the > `promotion_id` to analysis the effect of the promotion. > - stream join in AD(advertising) attribution: Job need to join the AD click > log with the item payment log on the `click_id` to find which click of which > AD that brings the payment to do attribution. > - stream join in click log analysis of doc: Job need to join viewed log(doc > viewed by users) with the click log (doc clicked by users) to analysis the > reason of the click and the property of the users. > - ….so on > All these cases have one common property, that is the _joined ratio_ is very > low. Here is a example to describe it, imagine that, we have 1 records > from the left stream, and 1 records from the right stream, and we execute > _select * from leftStream l join rightStream r on l.id = r.id_ , we only got > 100 record from the result, that is the case for low _joined ratio_, this is > an example for inner join, but it can also apply to left & right join. > there are more example I can come up with low _joined ratio_ , but the most > important point I want to expressed is that, the low _joined ratio_ of stream > join in production is a very common phenomenon(maybe the almost common > phenomenon in some companies, at least in our company that is the case). > *Then how to improve it?* > We can see from the above case, 1 record join 1 record we only got > 100 result, that means, we query the state 2 times (1 for the left > stream and 1 for the right stream) but only 100 of them are meaningful!!! > If we could reduce the useless query times, then we can definitely improve > the performance of stream join. > the way we used to improve this is to introduce the _Runtime Filter Join_, > the mainly ideal is that, we build a _filter_ for the state on each side > (left stream & right stream). When we need to query the state on that side we > first check the corresponding _filter_ whether the _key_ is possible in the > state, if the _filter_ say "not, it impossible in the state", then we stop > querying the state, if it say "hmm, it maybe in state", then we need to query > the state. As you can
[jira] [Updated] (FLINK-9373) Fix potential data losing for RocksDBBackend
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9373: -- Fix Version/s: 1.5.0 > Fix potential data losing for RocksDBBackend > > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9373) Fix potential data losing for RocksDBBackend
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9373: -- Summary: Fix potential data losing for RocksDBBackend (was: Always call RocksIterator.status() to check the internal error of RocksDB) > Fix potential data losing for RocksDBBackend > > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8918) Introduce Runtime Filter Join
[ https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-8918: -- Description: In general, stream join is one of the most performance cost task. For every record from both side, we need to query the state from the other side, this will lead to poor performance when the state size if huge. So, in production, we always need to spend a lot slots to handle stream join. But, indeed, we can improve this in somehow, there a phenomenon of stream join can be found in production. That's the `joined ratio` of the stream join is often very low, for example. - stream join in promotion analysis: Job need to join the promotion log with the action(click, view, payment, collection, retweet) log with the `promotion_id` to analysis the effect of the promotion. - stream join in AD(advertising) attribution: Job need to join the AD click log with the item payment log on the `click_id` to find which click of which AD that brings the payment to do attribution. - stream join in click log analysis of doc: Job need to join viewed log(doc viewed by users) with the click log (doc clicked by users) to analysis the reason of the click and the property of the users. - ….so on All these cases have one common property, that is the _joined ratio_ is very low. Here is a example to describe it, imagine that, we have 1 records from the left stream, and 1 records from the right stream, and we execute _select * from leftStream l join rightStream r on l.id = r.id_ , we only got 100 record from the result, that is the case for low _joined ratio_, this is an example for inner join, but it can also apply to left & right join. there are more example I can come up with low _joined ratio_ , but the most important point I want to expressed is that, the low _joined ratio_ of stream join in production is a very common phenomenon(maybe the almost common phenomenon in some companies, at least in our company that is the case). *Then how to improve it?* We can see from the above case, 1 record join 1 record we only got 100 result, that means, we query the state 2 times (1 for the left stream and 1 for the right stream) but only 100 of them are meaningful!!! If we could reduce the useless query times, then we can definitely improve the performance of stream join. the way we used to improve this is to introduce the _Runtime Filter Join_, the mainly ideal is that, we build a _filter_ for the state on each side (left stream & right stream). When we need to query the state on that side we first check the corresponding _filter_ whether the _key_ is possible in the state, if the _filter_ say "not, it impossible in the state", then we stop querying the state, if it say "hmm, it maybe in state", then we need to query the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, it has all the feature that we expected: _extremely good performance_, _non-existence of false negative_. *the simplest pseudo code for _Runtime Filter Join_(the comments inline are based on RocksDBBackend)* {code:java} void performJoinNormally(Record recordFromLeftStream) { Iterator rightIterator = rigthStreamState.iterator(); // perform the `seek()` on the RocksDB, and iterator one by one, // this is an expensive operation especially when the key can't be found in RocksDB. for (Record recordFromRightState : rightIterator) { ... } } void performRuntimeFilterJoin(Record recordFromLeftStream) { Iterator rightIterator = EMPTY_ITERATOR; if (rigthStreamfilter.containsCurrentKey()) { rightIterator = rigthStreamState.iterator(); } // perform the `seek()` only when filter.containsCurrentKey() return true for (Record recordFromRightState : rightIterator) { ... } // add the current key into the filter of left stream. leftStreamFilter.addCurrentKey(); } {code} A description of Runtime Filter Join for batch join can be found [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] (even though it not for stream join original, but we can easily refer it to `stream join`) was: In general, stream join is one of the most performance cost task. For every record from both side, we need to query the state from the other side, this will lead to poor performance when the state size if huge. So, in production, we always need to spend a lot slots to handle stream join. But, indeed, we can improve this in somehow, there a phenomenon of stream join can be found in production. That's the `joined ratio` of the stream join is often very low, for example. - stream join in promotion analysis: Job need to join the promotion log with the action(click, view, payment, collection, retweet) log with the `promotion_id` to analysis the effect of the promotion.