[jira] [Commented] (FLINK-9364) Add doc of the memory usage in flink

2018-08-06 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571015#comment-16571015
 ] 

Sihua Zhou commented on FLINK-9364:
---

Hi [~till.rohrmann] I was catch up by some terrible work, so I didn't work on 
this recently, will get back once I have time off.

> Add doc of the memory usage in flink
> 
>
> Key: FLINK-9364
> URL: https://issues.apache.org/jira/browse/FLINK-9364
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.7.0
>
>
> We need to add a doc to describe the memory usage in flink, especially when 
> people use the RocksDBBackend, many people get confuse because of that (I've 
> saw serval question related to this on the user emails).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9364) Add doc of the memory usage in flink

2018-08-06 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9364:
--
Fix Version/s: (was: 1.6.0)
   1.7.0

> Add doc of the memory usage in flink
> 
>
> Key: FLINK-9364
> URL: https://issues.apache.org/jira/browse/FLINK-9364
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.7.0
>
>
> We need to add a doc to describe the memory usage in flink, especially when 
> people use the RocksDBBackend, many people get confuse because of that (I've 
> saw serval question related to this on the user emails).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState

2018-07-12 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-9804.
-
Resolution: Fixed

Merged in:

1.6.0: def2aed5c75b5a00815186d3343e66cb1dc01ac0

1.5.2: 8a564f82aa521670a0b6813c5deb65586b1fa136

> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -
>
> Key: FLINK-9804
> URL: https://issues.apache.org/jira/browse/FLINK-9804
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Aljoscha Krettek
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
>   final int namespace1ElementsNum = 1000;
>   final int namespace2ElementsNum = 1000;
>   String fieldName = "get-keys-test";
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   final String ns1 = "ns1";
>   MapState keyedState1 = 
> backend.getPartitionedState(
>   ns1,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = 0; key < namespace1ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState1.put("he", key * 2);
>   keyedState1.put("ho", key * 2);
>   }
>   final String ns2 = "ns2";
>   MapState keyedState2 = 
> backend.getPartitionedState(
>   ns2,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = namespace1ElementsNum; key < 
> namespace1ElementsNum + namespace2ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState2.put("he", key * 2);
>   keyedState2.put("ho", key * 2);
>   }
>   // valid for namespace1
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns1).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = 0; expectedKey < 
> namespace1ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   // valid for namespace2
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns2).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = namespace1ElementsNum; 
> expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState

2018-07-11 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540077#comment-16540077
 ] 

Sihua Zhou commented on FLINK-9804:
---

[~aljoscha] Yes, I'm writing the PR description...

> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -
>
> Key: FLINK-9804
> URL: https://issues.apache.org/jira/browse/FLINK-9804
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
>   final int namespace1ElementsNum = 1000;
>   final int namespace2ElementsNum = 1000;
>   String fieldName = "get-keys-test";
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   final String ns1 = "ns1";
>   MapState keyedState1 = 
> backend.getPartitionedState(
>   ns1,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = 0; key < namespace1ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState1.put("he", key * 2);
>   keyedState1.put("ho", key * 2);
>   }
>   final String ns2 = "ns2";
>   MapState keyedState2 = 
> backend.getPartitionedState(
>   ns2,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = namespace1ElementsNum; key < 
> namespace1ElementsNum + namespace2ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState2.put("he", key * 2);
>   keyedState2.put("ho", key * 2);
>   }
>   // valid for namespace1
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns1).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = 0; expectedKey < 
> namespace1ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   // valid for namespace2
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns2).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = namespace1ElementsNum; 
> expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState

2018-07-11 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9804:
-

Assignee: Sihua Zhou  (was: vinoyang)

> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -
>
> Key: FLINK-9804
> URL: https://issues.apache.org/jira/browse/FLINK-9804
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Aljoscha Krettek
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
>   final int namespace1ElementsNum = 1000;
>   final int namespace2ElementsNum = 1000;
>   String fieldName = "get-keys-test";
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   final String ns1 = "ns1";
>   MapState keyedState1 = 
> backend.getPartitionedState(
>   ns1,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = 0; key < namespace1ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState1.put("he", key * 2);
>   keyedState1.put("ho", key * 2);
>   }
>   final String ns2 = "ns2";
>   MapState keyedState2 = 
> backend.getPartitionedState(
>   ns2,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = namespace1ElementsNum; key < 
> namespace1ElementsNum + namespace2ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState2.put("he", key * 2);
>   keyedState2.put("ho", key * 2);
>   }
>   // valid for namespace1
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns1).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = 0; expectedKey < 
> namespace1ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   // valid for namespace2
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns2).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = namespace1ElementsNum; 
> expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState

2018-07-11 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540034#comment-16540034
 ] 

Sihua Zhou commented on FLINK-9804:
---

Hi [~yanghua] do you start working on this already? I just finished this but 
forgot to assign it to myself. Anyway, this is my code 
([https://github.com/sihuazhou/flink/commit/efc4096a4a6f38b3acb0b5189804f7b452218f23]),
 hope this could somehow reduce your work or give you some help. ;)

> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -
>
> Key: FLINK-9804
> URL: https://issues.apache.org/jira/browse/FLINK-9804
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
>   final int namespace1ElementsNum = 1000;
>   final int namespace2ElementsNum = 1000;
>   String fieldName = "get-keys-test";
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   final String ns1 = "ns1";
>   MapState keyedState1 = 
> backend.getPartitionedState(
>   ns1,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = 0; key < namespace1ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState1.put("he", key * 2);
>   keyedState1.put("ho", key * 2);
>   }
>   final String ns2 = "ns2";
>   MapState keyedState2 = 
> backend.getPartitionedState(
>   ns2,
>   StringSerializer.INSTANCE,
>   new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>   );
>   for (int key = namespace1ElementsNum; key < 
> namespace1ElementsNum + namespace2ElementsNum; key++) {
>   backend.setCurrentKey(key);
>   keyedState2.put("he", key * 2);
>   keyedState2.put("ho", key * 2);
>   }
>   // valid for namespace1
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns1).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = 0; expectedKey < 
> namespace1ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   // valid for namespace2
>   try (Stream keysStream = backend.getKeys(fieldName, 
> ns2).sorted()) {
>   PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>   for (int expectedKey = namespace1ElementsNum; 
> expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
>   assertTrue(actualIterator.hasNext());
>   assertEquals(expectedKey, 
> actualIterator.nextInt());
>   }
>   assertFalse(actualIterator.hasNext());
>   }
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9736) Potential null reference in KeyGroupPartitionedPriorityQueue#poll()

2018-07-05 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533506#comment-16533506
 ] 

Sihua Zhou commented on FLINK-9736:
---

Hi [~yuzhih...@gmail.com] AFAIK, {{heapOfKeyGroupHeaps}} will never be empty, 
as I mentioned above, it created in the constructor of  
{{KeyGroupPartitionedPriorityQueue}} to maintain the timer structure base on 
the heap of each key group. The number of elements in heapOfKeyGroupHeaps 
should be equal to the number of key groups in each Task, but as a double check 
I think maybe [~stefanrichte...@gmail.com] could help to confirm this.

> Potential null reference in KeyGroupPartitionedPriorityQueue#poll()
> ---
>
> Key: FLINK-9736
> URL: https://issues.apache.org/jira/browse/FLINK-9736
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
> final PQ headList = heapOfkeyGroupedHeaps.peek();
> final T head = headList.poll();
> {code}
> {{peek}} call may return null.
> The return value should be checked before de-referencing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9735) Potential resource leak in RocksDBStateBackend#getDbOptions

2018-07-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532982#comment-16532982
 ] 

Sihua Zhou commented on FLINK-9735:
---

I think this might be a trade-off of the design,

- 1) let the use create the option base on some basic configuration to get a 
good performance in flink (e.g. setFsync(false) because Flink does not rely on 
RocksDB data on disk for recovery).
- 2) give the chance for the use to create the option totally by themselves.

Given the design the user need to follow the rule it outlined, but I agree that 
OptionFactory's api is a bit easy to overlook, and lead the currentOptions 
parameter to be abandoned. Maybe a better interface could be provided to 
address this.

> Potential resource leak in RocksDBStateBackend#getDbOptions
> ---
>
> Key: FLINK-9735
> URL: https://issues.apache.org/jira/browse/FLINK-9735
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> Here is related code:
> {code}
> if (optionsFactory != null) {
>   opt = optionsFactory.createDBOptions(opt);
> }
> {code}
> opt, an DBOptions instance, should be closed before being rewritten.
> getColumnOptions has similar issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9735) Potential resource leak in RocksDBStateBackend#getDbOptions

2018-07-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532956#comment-16532956
 ] 

Sihua Zhou commented on FLINK-9735:
---

Oh [~yuzhih...@gmail.com] you are right! The case in {{RocksDBResource}} should 
be fixed, it didn't follow the OptionsFactory's java doc.

So the true issue here is not related to the code
{code:java}
if (optionsFactory != null) {
  opt = optionsFactory.createDBOptions(opt);
}
{code}
in {{RocksDBStateBackend}}, but for the use case in {{RocksDBResource}}. Do you 
mind if I change this ticket's description and title to the true issue.

> Potential resource leak in RocksDBStateBackend#getDbOptions
> ---
>
> Key: FLINK-9735
> URL: https://issues.apache.org/jira/browse/FLINK-9735
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> Here is related code:
> {code}
> if (optionsFactory != null) {
>   opt = optionsFactory.createDBOptions(opt);
> }
> {code}
> opt, an DBOptions instance, should be closed before being rewritten.
> getColumnOptions has similar issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9735) Potential resource leak in RocksDBStateBackend#getDbOptions

2018-07-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532728#comment-16532728
 ] 

Sihua Zhou commented on FLINK-9735:
---

Hi [~yuzhih...@gmail.com], I think this is a non issue, because the opt will be 
reused. If we have a look at the java doc of the 
{{OptionsFactory#createDBOptions()}} we can find that this is intended.

{code:java}
public interface OptionsFactory extends java.io.Serializable {

/**
 * This method should set the additional options on top of the current 
options object.
 * The current options object may contain pre-defined options based on 
flags that have
 * been configured on the state backend.
 *
 * It is important to set the options on the current object and 
return the result from
 * the setter methods, otherwise the pre-defined options may get lost.
 *
 * @param currentOptions The options object with the pre-defined 
options.
 * @return The options object on which the additional options are set.
 */
DBOptions createDBOptions(DBOptions currentOptions);

/**
 * This method should set the additional options on top of the current 
options object.
 * The current options object may contain pre-defined options based on 
flags that have
 * been configured on the state backend.
 *
 * It is important to set the options on the current object and 
return the result from
 * the setter methods, otherwise the pre-defined options may get lost.
 *
 * @param currentOptions The options object with the pre-defined 
options.
 * @return The options object on which the additional options are set.
 */
ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions);

}
{code}

What do you think?

> Potential resource leak in RocksDBStateBackend#getDbOptions
> ---
>
> Key: FLINK-9735
> URL: https://issues.apache.org/jira/browse/FLINK-9735
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> Here is related code:
> {code}
> if (optionsFactory != null) {
>   opt = optionsFactory.createDBOptions(opt);
> }
> {code}
> opt, an DBOptions instance, should be closed before being rewritten.
> getColumnOptions has similar issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9736) Potential null reference in KeyGroupPartitionedPriorityQueue#poll()

2018-07-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532726#comment-16532726
 ] 

Sihua Zhou commented on FLINK-9736:
---

Hi [~yuzhih...@gmail.com] I think this might look like a non issue, because 
`heapOfKeyGroupHeaps` is a heap-of-heap that created on the constructer of 
{{KeyGroupPartitionedPriorityQueue}} to maintain the timer structure base on 
the heap of each key group, and we never call `poll()` on it.

> Potential null reference in KeyGroupPartitionedPriorityQueue#poll()
> ---
>
> Key: FLINK-9736
> URL: https://issues.apache.org/jira/browse/FLINK-9736
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
> final PQ headList = heapOfkeyGroupedHeaps.peek();
> final T head = headList.poll();
> {code}
> {{peek}} call may return null.
> The return value should be checked before de-referencing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9351) RM stop assigning slot to Job because the TM killed before connecting to JM successfully

2018-07-02 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-9351.
-
Resolution: Duplicate

This issue have been fixed by the way in the PR of 
[FLINK-9456|https://issues.apache.org/jira/browse/FLINK-9456].

> RM stop assigning slot to Job because the TM killed before connecting to JM 
> successfully
> 
>
> Key: FLINK-9351
> URL: https://issues.apache.org/jira/browse/FLINK-9351
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0
>
>
> The steps are the following(copied from Stephan's comments in 
> [5931|https://github.com/apache/flink/pull/5931]):
> - JobMaster / SlotPool requests a slot (AllocationID) from the ResourceManager
> - ResourceManager starts a container with a TaskManager
> - TaskManager registers at ResourceManager, which tells the TaskManager to 
> push a slot to the JobManager.
> - TaskManager container is killed
> - The ResourceManager does not queue back the slot requests (AllocationIDs) 
> that it sent to the previous TaskManager, so the requests are lost and need 
> to time out before another attempt is tried.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9474) Add runtime support of distinct filter using ElasticBloomFilter

2018-06-27 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9474:
--
Description: We can implement an approximate version of "distinct filter" 
base on the "Elastic Bloom Filter", It could be very fast because we don't need 
to query the state anymore, its accuracy should could be configurable. e.g 95%, 
98% and it might unable to support retraction.  (was: We can implement an 
approximate version of "count distinct" base on the "Elastic Bloom Filter", It 
could be very fast because we don't need to query the state anymore, its 
accuracy should could be configurable. e.g 95%, 98%.)

> Add runtime support of distinct filter using ElasticBloomFilter
> ---
>
> Key: FLINK-9474
> URL: https://issues.apache.org/jira/browse/FLINK-9474
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We can implement an approximate version of "distinct filter" base on the 
> "Elastic Bloom Filter", It could be very fast because we don't need to query 
> the state anymore, its accuracy should could be configurable. e.g 95%, 98% 
> and it might unable to support retraction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9474) Add runtime support of distinct filter using ElasticBloomFilter

2018-06-27 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9474:
--
Summary: Add runtime support of distinct filter using ElasticBloomFilter  
(was: Introduce an approximate version of "count distinct")

> Add runtime support of distinct filter using ElasticBloomFilter
> ---
>
> Key: FLINK-9474
> URL: https://issues.apache.org/jira/browse/FLINK-9474
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We can implement an approximate version of "count distinct" base on the 
> "Elastic Bloom Filter", It could be very fast because we don't need to query 
> the state anymore, its accuracy should could be configurable. e.g 95%, 98%.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-06-26 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9619:
--
Issue Type: Improvement  (was: Bug)

> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.6.0, 1.5.1
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-06-26 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9619:
--
Priority: Major  (was: Critical)

> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.6.0, 1.5.1
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9661) TTL state should support to do time shift after restoring from checkpoint( savepoint).

2018-06-25 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9661:
-

 Summary: TTL state should support to do time shift after restoring 
from checkpoint( savepoint).
 Key: FLINK-9661
 URL: https://issues.apache.org/jira/browse/FLINK-9661
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Sihua Zhou


The initial version of the TTL-state appends the expired timestamp along the 
state record, and check the expired timestamp with the condition 
{{expired_timestamp <= current_time}} when accessing the state, if it is true 
then the record is expired, otherwise it is still alive. This could works 
pretty fine in the most cases, but in some case, we need to do time shift, 
otherwise it may cause some unexpected result when using the ProccessTime, I 
roughly describe two case as follow.

- when restoring the job from the savepoint

For example, the user set the TTL to 2h for the state, if he trigger a 
savepoint and restore the job from the savepoint after 2h(maybe some reason 
that delay he to restore the job quickly), then the restored job's previous 
state data are all expired.

- when the job spend a long time to recover from a failure

For example, there are many jobs running on a yarn session cluster, and the 
cluster configured to use the DFS to store the checkpoint data, but 
unfortunately, the DFS meet a strange problem which makes the jobs on the 
cluster begin to loop in recovery-fail-recovery-fail... the devs spend some 
time to address the issue of DFS and the jobs start working properly, but if 
the "{{system down time >= TTL}}" then the job's previous state data will be 
expired in this case.

To avoid the problems as above, we need to do time shift after the job 
recovering from checkpoint & savepoint. A possible approach is outlined in 
[6186|https://github.com/apache/flink/pull/6186].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final

2018-06-25 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522981#comment-16522981
 ] 

Sihua Zhou commented on FLINK-9585:
---

[~Zentol] thanks for pointing out this issue for me, I got it.

> Logger in ZooKeeperStateHandleStore is public and non-final
> ---
>
> Key: FLINK-9585
> URL: https://issues.apache.org/jira/browse/FLINK-9585
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Trivial
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The logger in {{ZooKeeperStateHandleStore}} should be private and final.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final

2018-06-24 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-9585.
-
Resolution: Fixed

Merged in:
master: 5fa61d8ceac8f865002dc0ef84dc1a3c65753d0b

> Logger in ZooKeeperStateHandleStore is public and non-final
> ---
>
> Key: FLINK-9585
> URL: https://issues.apache.org/jira/browse/FLINK-9585
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Trivial
>  Labels: pull-request-available
>
> The logger in {{ZooKeeperStateHandleStore}} should be private and final.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.

2018-06-21 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9633:
-

 Summary: Flink doesn't use the Savepoint path's filesystem to 
create the OuptutStream on Task.
 Key: FLINK-9633
 URL: https://issues.apache.org/jira/browse/FLINK-9633
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0, 1.5.1


Currently, flink use the Savepoint's filesystem to create the meta output 
stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses 
the Checkpoint's filesystem to create the checkpoint data output stream. When 
the Savepoint & Checkpoint in different filesystem this will lead to 
problematic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9622) DistributedCacheDfsTest failed on travis

2018-06-20 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517878#comment-16517878
 ] 

Sihua Zhou commented on FLINK-9622:
---

one more instances: https://api.travis-ci.org/v3/job/394119180/log.txt

> DistributedCacheDfsTest failed on travis
> 
>
> Key: FLINK-9622
> URL: https://issues.apache.org/jira/browse/FLINK-9622
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Sihua Zhou
>Priority: Major
>
> DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis.
> instance: https://api.travis-ci.org/v3/job/394399700/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9622) DistributedCacheDfsTest failed on travis

2018-06-20 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517874#comment-16517874
 ] 

Sihua Zhou commented on FLINK-9622:
---

cc [~dawidwys]

> DistributedCacheDfsTest failed on travis
> 
>
> Key: FLINK-9622
> URL: https://issues.apache.org/jira/browse/FLINK-9622
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Sihua Zhou
>Priority: Major
>
> DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis.
> instance: https://api.travis-ci.org/v3/job/394399700/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9622) DistributedCacheDfsTest failed on travis

2018-06-20 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9622:
-

 Summary: DistributedCacheDfsTest failed on travis
 Key: FLINK-9622
 URL: https://issues.apache.org/jira/browse/FLINK-9622
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.0
Reporter: Sihua Zhou


DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis.

instance: https://api.travis-ci.org/v3/job/394399700/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-06-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9619:
-

 Summary: Always close the task manager connection when the 
container is completed in YarnResourceManager
 Key: FLINK-9619
 URL: https://issues.apache.org/jira/browse/FLINK-9619
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.6.0, 1.5.1
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0, 1.5.1


We should always eagerly close the connection with task manager when the 
container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9417) Send heartbeat requests from RPC endpoint's main thread

2018-06-19 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517790#comment-16517790
 ] 

Sihua Zhou commented on FLINK-9417:
---

Hi [~till.rohrmann] One thing come to my mind, If we send heartbeat requests 
from RPC's main thread, then should we also do a checking for the 
HEARTBEAT_INTERVAL with a sanity min value(currently it only need to greater 
than 0)? If the user configure a very small value e.g 10, then the resource 
manager and the job master will be kept always very busy just for sending the 
heartbeat.

> Send heartbeat requests from RPC endpoint's main thread
> ---
>
> Key: FLINK-9417
> URL: https://issues.apache.org/jira/browse/FLINK-9417
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>
> Currently, we use the {{RpcService#scheduledExecutor}} to send heartbeat 
> requests to remote targets. This has the problem that we still see heartbeats 
> from this endpoint also if its main thread is currently blocked. Due to this, 
> the heartbeat response cannot be processed and the remote target times out. 
> On the remote side, this won't be noticed because it still receives the 
> heartbeat requests.
> A solution to this problem would be to send the heartbeat requests to the 
> remote thread through the RPC endpoint's main thread. That way, also the 
> heartbeats would be blocked if the main thread is blocked/busy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9613) YARNSessionCapacitySchedulerITCase failed because YarnTestBase.checkClusterEmpty()

2018-06-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9613:
-

 Summary: YARNSessionCapacitySchedulerITCase failed because 
YarnTestBase.checkClusterEmpty()
 Key: FLINK-9613
 URL: https://issues.apache.org/jira/browse/FLINK-9613
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.0
Reporter: Sihua Zhou


The test YARNSessionCapacitySchedulerITCase failed on travis because of 
.YarnTestBase.checkClusterEmpty().

https://api.travis-ci.org/v3/job/394017104/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9601) Snapshot of CopyOnWriteStateTable will failed when the amount of record is more than MAXIMUM_CAPACITY

2018-06-16 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9601:
--
Summary: Snapshot of CopyOnWriteStateTable will failed when the amount of 
record is more than MAXIMUM_CAPACITY  (was: Snapshot of CopyOnWriteStateTable 
will failed, when the amount of record is more than MAXIMUM_CAPACITY)

> Snapshot of CopyOnWriteStateTable will failed when the amount of record is 
> more than MAXIMUM_CAPACITY
> -
>
> Key: FLINK-9601
> URL: https://issues.apache.org/jira/browse/FLINK-9601
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> In short, the problem is that we reuse the `snaphotData` as the output array 
> when partitioning the input data, but the `snapshotData` is max length is `1 
> << 30`. So when the records in `CopyOnWriteStateTable` is more than `1 << 30` 
> (e.g. 1 <<30 + 1), then the check 
> `Preconditions.checkState(partitioningDestination.length >= 
> numberOfElements);` could be failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9601) Snapshot of CopyOnWriteStateTable will failed, when the amount of record is more than MAXIMUM_CAPACITY

2018-06-16 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9601:
-

 Summary: Snapshot of CopyOnWriteStateTable will failed, when the 
amount of record is more than MAXIMUM_CAPACITY
 Key: FLINK-9601
 URL: https://issues.apache.org/jira/browse/FLINK-9601
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0


In short, the problem is that we reuse the `snaphotData` as the output array 
when partitioning the input data, but the `snapshotData` is max length is `1 << 
30`. So when the records in `CopyOnWriteStateTable` is more than `1 << 30` 
(e.g. 1 <<30 + 1), then the check 
`Preconditions.checkState(partitioningDestination.length >= numberOfElements);` 
could be failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-06-14 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9455:
-

Assignee: Till Rohrmann  (was: Sihua Zhou)

> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-06-14 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513336#comment-16513336
 ] 

Sihua Zhou commented on FLINK-9455:
---

Hi [~till.rohrmann] the more I thought about this , the more I found it's 
tricky, since you maybe the best one that familiar with this related part. I 
don't want mess this up, I would leave this issue to you. 

> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9584) Unclosed streams in Bucketing-/RollingSink

2018-06-14 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9584:
-

Assignee: Sihua Zhou

> Unclosed streams in Bucketing-/RollingSink
> --
>
> Key: FLINK-9584
> URL: https://issues.apache.org/jira/browse/FLINK-9584
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Sihua Zhou
>Priority: Major
>
> There are 4 instances of {{FSDataOutputStream}} that are not properly closed 
> in the {{BucketingSink}} (2) and {{RollingSink}} (2).
>  
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java#L536]
>  
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java#L705]
>  
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L638]
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#882]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510828#comment-16510828
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow], I didn't see the email you sent yet, but I just had a look at your 
code, I think the "non-scale-able"  might be caused by your test code. From 
your code we could see that the source's parallelism is always the same as the 
other operators. And in the each sub-task of the source, you use the loop to 
mock the source records, that means the QPS of the source will increase when 
you trying to rescale up the parallelism of your job, in the end, you didn't 
scale up anything indeed. I would suggest to set the parallelism of the source 
to a fixed value(e.g. 4), and scale up the job, then let's see whether it's 
scalable. I didn't test your code on cluster yet, will test it later. My email 
is "summerle...@163.com", if you had problem to send email to 
"u...@flink.apache.org", you could send to me personally if you want. Thanks~

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510757#comment-16510757
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi, I think you could send your question to the "u...@flink.apache.org".

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510750#comment-16510750
 ] 

Sihua Zhou commented on FLINK-9506:
---

I'm + 1 to close this ticket and move to ML, we can definitely continue the 
discussion there, I will try out your code, and give feedbacks tonight, please 
bear me several hours. Cause I currently busy with something others. Thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510716#comment-16510716
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] "If we further comment out recordStore.add() then everything works 
well, no more fluctuation" this surprised me, because for RocksDB backend the 
`listState.add()` is just merge(just put without any reading) the record into 
db, it's cheap in my mind. I had downloaded your code. Thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510649#comment-16510649
 ] 

Sihua Zhou commented on FLINK-9506:
---

Additional, do you make sure that you are using the RocksDb backend? Could you 
find the "Initializing RocksDB keyed state backend" in the TaskManager's log?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510647#comment-16510647
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] in the input_stop_when_timer_run.png, does the yellow line mean QPS 
of input, and the green line mean QPS of output? If this picture is captured 
when the onTimer is uncomment out, then it didn't surprise me, but if the 
picture is captured when the content of onTimer is commented out, then it 
surprised me a bit.

And you mentioned that, when the content of onTimer is commented out, the 
Fluctuation still exists. Does the commented out means that there is nothing in 
the onTimer()? If yes, I think it surprised me and for an additional could you 
also comment out the `recordStore.add()` in processElement(). If both the 
content of onTimer() and the `recordStore.add()` are commented out and the 
Fluctuation still there, I think the problem is related to the timer, because 
of the GC.

And I'm curious about the QPS of source for you job? and the degree of the 
parallelism of your job?

Thanks~



> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510580#comment-16510580
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] I don't think this a limitation in Flink, we have more complex with 
terrible data flow on production but flink supports it very well. Let look into 
your case deeper. 

- Did you enable the checkpoint now? if yes, are you using incremental 
checkpoint? and what the checkpoint interval?
- could you try to comment the code that related to the accumulation in the 
`onTimer` and have a try? Specially, comment the line "listState.get()"
- Is it possible that you could somehow provide some code that related to the 
`ProcessAggregation` that you are using currentlly?

Thanks

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509818#comment-16509818
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/12/18 4:34 PM:


Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation? In fact, it's a bit hard for me to believe the 
fluctuation is caused by the keyBy. AFAIK, it just controls which channel the 
record to go(when transfer between operators) and the content of the key stored 
in the RocksDB, without using any state the keyBy() should be cheap.

I think the picture related to he keyNoHash vs KeyHash is what I expected. With 
hash() the key's length is only 4 bytes and the distribution is uniform, 
without hash your key's length is 50 and also the distribution maybe not 
uniform. But with the hash() approach you could only get a approximate result, 
if that is enough for you then I think it's good to go now, is it not enough 
for you?



was (Author: sihuazhou):
Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation?

I think the picture related to he keyNoHash vs KeyHash is what I expected. With 
hash() the key's length is only 4 bytes and the distribution is uniform, 
without hash your key's length is 50 and also the distribution maybe not 
uniform. But with the hash() approach you could only get a approximate result, 
if that is enough for you then I think it's good to go now, is it not enough 
for you?


> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509818#comment-16509818
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/12/18 4:26 PM:


Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation?

I think the picture related to he keyNoHash vs KeyHash is what I expected. With 
hash() the key's length is only 4 bytes and the distribution is uniform, 
without hash your key's length is 50 and also the distribution maybe not 
uniform. But with the hash() approach you could only get a approximate result, 
if that is enough for you then I think it's good to go now, is it not enough 
for you?



was (Author: sihuazhou):
Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation?

I think the picture related to he keyNoHash vs KeyHash is what I expected. 
Without hash() the key's length is only 4 bytes and the distribution is 
uniform, without hash your key's length is 50 and also the distribution maybe 
not uniform. But with the hash() approach you could only get a approximate 
result, if that is enough for you then I think it's good to go now, is it not 
enough for you?


> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509818#comment-16509818
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation?

I think the picture related to he keyNoHash vs KeyHash is what I expected. 
Without hash() the key's length is only 4 bytes and the distribution is 
uniform, without hash your key's length is 50 and also the distribution maybe 
not uniform. But with the hash() approach you could only get a approximate 
result, if that is enough for you then I think it's good to go now, is it not 
enough for you?


> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-11 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507815#comment-16507815
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] I think we can close this ticket now, do you agree? Thanks~

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9561) Lack of the api to set RocksDB Option by flink config

2018-06-09 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507249#comment-16507249
 ] 

Sihua Zhou commented on FLINK-9561:
---

Hi [~aitozi] I think flink already supported this, you can implement the 
{{OptionsFactory}} to override the {{OptionsFactory#createDBOptions}} to 
special your {{DBOptions}}, and override the 
{{OptionsFactory#createColumnOptions}} to special your {{ColumnFamilyOptions}}. 
A simple example likes like below

{code:java}
 rocksDbBackend.setOptions(new OptionsFactory() {
 
  public DBOptions createDBOptions(DBOptions currentOptions) {
  return currentOptions.setMaxOpenFiles(1024);
  }
 
 public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
 return 
currentOptions.setCompactionStyle(org.rocksdb.CompactionStyle.LEVEL);
 }
 });
{code}

> Lack of the api to set RocksDB Option by flink config
> -
>
> Key: FLINK-9561
> URL: https://issues.apache.org/jira/browse/FLINK-9561
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> we have serval PredefinedOptions for rocksdb options, but it is not 
> configurable, I think should support this to allow user to choose according 
> to the device. [~StephanEwen] do you think so?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504729#comment-16504729
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] From the top of my head, I list answers here:

- >> 1. Just to confirm, RocksDB is needed to setup in every TM machine? Any 
other option?

RocksDB is needed to setup in every sub-tasks that use the KeyedState if you 
are using RocksDB backend.

- >> 2. What is the recommendation for RocksDB's statebackend? We are using 
tmpfs with checkpoint now with savepoint persists to hdfs.

Q1. I think the default configuration of the RocksDB backend is quite good for 
the most of the jobs.
Q2. I'm not sure whether I got you correctly, the savepoint is triggered 
manually, and checkpoint is triggered automatically, you means that you trigger 
the savepoint manually periodically?

- >> 3. By source code, rocksdb options like parallelism and certain predefined 
option could be configured, any corresponding parameter in flink_config.yaml?

AFAIK, RocksDB's options need to set in source code if you need to special it. 
The default parallelism of the operator can be configured in flink-conf.yaml

- >> 4. related to your RocksDB config.

I see you are using "file:///tmp/rocksdb_simple_example/checkpoints" as the 
checkpoint directory, I'm not sure if it's accessible to all TMs. If yes, I 
think that is ok, and also I didn't see your checkpoint interval...

BTW, you said you are using the {{r.getUNIQUE_KEY();}}  as the key, I'm a bit 
curious about it's length in general. If it's too long and if you don't need an 
exactly result, you could use the {{r.getUNIQUE_KEY().hashCode();}} instead, 
that may also help to improve the performance. And in fact, I also agree with 
[~kkrugler] that this type of question is best asked in the user mail list, 
that way more people could take part in and you might also get more ideals from 
them. ;)



> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503505#comment-16503505
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/7/18 9:25 AM:
---

[~yow] Maybe there is one more optimization that could have a try, I see you 
are using the ReduceState in your code just to accumulate the 
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For 
the ReduceState it works as follows:

- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to 
RocksDB.

that means for input record in processElement(), it needs to do a `get` and a 
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to 
use the ListState instead. With using ListState, what you need to do are:

- Performing {{ListState.add(record)}} in {{processElement()}}, since the 
`ListState.add()` is cheap as it only put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}

In this way, for every key every second, you only need to do one read operation 
of RocksDB.





was (Author: sihuazhou):
[~yow] Maybe there is one more optimization that could have a try, I see you 
are using the ReduceState in your code just to accumulate the 
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For 
the ReduceState it works as follows:

- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to 
RocksDB.

that means for input record in processElement(), it needs to do a `get` and a 
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to 
use the ListState instead. With using ListState, what you need to do are:

- Performing {{ListState.add(record)}} in {{processElement()}}, since the 
`ListState.add()` is cheap as it not put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}

In this way, for every key very seconds, you only need to do one read operation 
of RocksDB.




> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9546) The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0

2018-06-07 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9546:
-

 Summary: The heartbeatTimeoutIntervalMs of HeartbeatMonitor should 
be larger than 0
 Key: FLINK-9546
 URL: https://issues.apache.org/jira/browse/FLINK-9546
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Sihua Zhou
Assignee: Sihua Zhou


The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, 
currently the arg check looks like
{code:java}
Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat 
timeout interval has to be larger than 0.");
{code}

it should be
{code:java}
Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat 
timeout interval has to be larger than 0.");
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503505#comment-16503505
 ] 

Sihua Zhou commented on FLINK-9506:
---

[~yow] Maybe there is one more optimization that could have a try, I see you 
are using the ReduceState in your code just to accumulate the 
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For 
the ReduceState it works as follows:

- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to 
RocksDB.

that means for input record in processElement(), it needs to do a `get` and a 
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to 
use the ListState instead. With using ListState, what you need to do are:

- Performing {{ListState.add(record)}} in {{processElement()}}, since the 
`ListState.add()` is cheap as it not put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}

In this way, for every key very seconds, you only need to do one read operation 
of RocksDB.




> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-06-06 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502935#comment-16502935
 ] 

Sihua Zhou commented on FLINK-9455:
---

[~till.rohrmann] Thanks for your reply, I agreed. I'll give a initial design 
document for this ticket probably on this weekend, but plz feel free to take 
over this anytime if you want to.

> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-06-05 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502809#comment-16502809
 ] 

Sihua Zhou commented on FLINK-8845:
---

Hi [~noliran] Yes, this is on purpose. In fact, we 
tried([FLINK-8859|https://issues.apache.org/jira/browse/FLINK-8859]) to disable 
WAL in RocksDBKeyedStateBackend when restoring the backend, but it will cause 
segfaults on 
travis([FLINK-8882|https://issues.apache.org/jira/browse/FLINK-8882]), and the 
reason why it caused the segfaults is still not clear, so we 
reverted([8922|https://issues.apache.org/jira/browse/FLINK-8922]) it in the 
end. 

> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8753) Introduce savepoint that go though the incremental checkpoint path

2018-06-05 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-8753.
-
Resolution: Invalid

> Introduce savepoint that go though the incremental checkpoint path
> --
>
> Key: FLINK-8753
> URL: https://issues.apache.org/jira/browse/FLINK-8753
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> Right now, savepoint goes through the full checkpoint path, take a savepoint 
> could be slowly. In our production, for some long term job it often costs 
> more than 10min to complete a savepoint which is unacceptable for a real time 
> job, so we have to turn back to use the externalized checkpoint instead 
> currently. But the externalized  checkpoint has a time interval (checkpoint 
> interval) between the last time. So I proposal to introduce the increment 
> savepoint which goes through the increment checkpoint path.
> Any advice would be appreciated!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-06-05 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502064#comment-16502064
 ] 

Sihua Zhou commented on FLINK-9455:
---

Hi [~till.rohrmann], I think this is a bit tricky than it looks like, I'd like 
to have a brief discussion with you before jumping into the implementation

- Should we consider the situation that every request of the TM could with 
different ResourceProfile? e.g. different cores and different TM memory. 
Currently the TM's configuration is "immutable" for every cluster, but I can 
see the trend that community may support request a user specific TM according 
to their config.

- Should we hold the assumption that the ResourceProfile we requested from the 
ResourceManager is definitely the same as the actual ResourceProfile we got 
from the TM?

Or do you have any ideal on this ticket?

Thanks~
Sihua

> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8601) Introduce ElasticBloomFilter for Approximate calculation and other situations of performance optimization

2018-06-05 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502037#comment-16502037
 ] 

Sihua Zhou commented on FLINK-8601:
---

Hi [~aljoscha] [~StephanEwen] would be really appreciate if anyone of you could 
take a look at this, I'm quite confidence that this would be a useful feature 
in many performance optimization scenarios...

> Introduce ElasticBloomFilter for Approximate calculation and other situations 
> of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h2. *Motivation*
> There are some scenarios drive us to introduce this ElasticBloomFilter, one 
> is Stream Join, another is Data Deduplication, and some special user 
> cases...This has given us a great experience, for example,  we implemented 
> the Runtime Filter Join base on it, and it gives us a great performance 
> improvement. With this feature, It diff us from the "normal stream join", 
> allows us to improve performance while reducing resource consumption by about 
> half!!!
> I will list the two most typical user cases that optimized by the 
> ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data 
> Dedeplication" in brief.
> *Scenario 1: Runtime Filter Join*
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the “joined ratio” of the stream join is often very 
> low, for example.
> - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, buy) log with the promotion_id to analysis the effect 
> of the promotion.
> - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the click_id to find which click of which AD 
> that brings the payment to do attribution.
> - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
> - ….so on
> All these cases have one common property, that is the joined ratio is very 
> low. Here is a example to describe it, we have 1 records from the left 
> stream, and 1 records from the right stream, and we execute  select * 
> from leftStream l join rightStream r on l.id = r.id , we only got 100 record 
> from the result, that is the case for low joined ratio, this is an example 
> for inner join, but it can also apply to left & right join.
> there are more example I can come up with low joined ratio…but the point I 
> want to raise up is that the low joined ratio of stream join in production is 
> a very common phenomenon(maybe even the almost common phenomenon in some 
> companies, at least in our company that is the case).
> *How to improve this?*
> We can see from the above case, 1 record join 1 record and we only 
> got 100 result, that means, we query the state 2 times (1 for the 
> left stream and 1 for the right stream) but only 100 of them are 
> meaningful!!! If we could reduce the useless query times, then we can 
> definitely improve the performance of stream join.
> the way we used to improve this is to introduce the Runtime Filter Join, the 
> mainly ideal is that, we build a filter for the state on each side (left 
> stream & right stream). When we need to query the state on that side we first 
> check the corresponding filter whether the key is possible in the state, if 
> the filter say "not, it impossible in the State", then we stop querying the 
> state, if it say "hmm, it maybe in state", then we need to query the state. 
> As you can see, the best choose of the filter is Bloom Filter, it has all the 
> feature that we want: extremely good performance, non-existence of false 
> negative.
> The simplest pseudo code for Runtime Filter Join(the comments is based on 
> RocksDBBackend)
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
>   Iterator rightIterator = rigthStreamState.iterator();
>   // perform the `seek()` on the RocksDB, and iterator one by one,
>   // this is an expensive operation especially when the key can't be 
> found in RocksDB.
> 

[jira] [Closed] (FLINK-8602) Improve recovery performance for rocksdb backend

2018-06-05 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-8602.
-
Resolution: Done

All sub-tasks are done.

> Improve recovery performance for rocksdb backend
> 
>
> Key: FLINK-8602
> URL: https://issues.apache.org/jira/browse/FLINK-8602
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> Right now, RocksDB backend supports {{fully checkpoint}} & {{incremental 
> checkpoint}}. This issue try to improve the performance of recovery from 
> {{fully checkpoint}} as well as from {{incremental checkpoint}}. It contains 
> 2 sub-tasks.
> 1. improve recovery performance for incremental checkpoint when 
> {{hasExtraKeys = true}}
> 2. introduce `parallel recovery` from both {{fully checkpoint}} and 
> {{incremental checkpoint}} (Base on {{ingestExternalFile()}} and 
> {{SstFileWriter}} provided by RocksDB). 
> Any advice would be highly appreciated!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9043) Introduce a friendly way to resume the job from externalized checkpoints automatically

2018-06-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501312#comment-16501312
 ] 

Sihua Zhou commented on FLINK-9043:
---

Okay, I see it is quite tricky to provide a completely automatic way, 
especially for s3...then how about take a step back and firstly extends the 
current strategy to allow use provide the previous jobId, we search for the 
last successful checkpoint(auto skip the corrupt checkpoint) automatically? 
[~StephanEwen] what do you think?

> Introduce a friendly way to resume the job from externalized checkpoints 
> automatically
> --
>
> Key: FLINK-9043
> URL: https://issues.apache.org/jira/browse/FLINK-9043
> Project: Flink
>  Issue Type: New Feature
>Reporter: godfrey johnson
>Assignee: Sihua Zhou
>Priority: Major
>
> I know a flink job can reovery from checkpoint with restart strategy, but can 
> not recovery as spark streaming jobs when job is starting.
> Every time, the submitted flink job is regarded as a new job, while , in the 
> spark streaming  job, which can detect the checkpoint directory first,  and 
> then recovery from the latest succeed one. However, Flink only can recovery 
> until the job failed first, then retry with strategy.
>  
> So, would flink support to recover from the checkpoint directly in a new job?
> h2. New description by [~sihuazhou]
> Currently, it's quite a bit not friendly for users to recover job from the 
> externalized checkpoint, user need to find the dedicate dir for the job which 
> is not a easy thing when there are too many jobs. This ticket attend to 
> introduce a more friendly way to allow the user to use the externalized 
> checkpoint to do recovery.
> The implementation steps are copied from the comments of [~StephanEwen]:
>  - We could make this an option where you pass a flag (-r) to automatically 
> look for the latest checkpoint in a given directory.
>  - If more than one jobs checkpointed there before, this operation would fail.
>  - We might also need a way to have jobs not create the UUID subdirectory, 
> otherwise the scanning for the latest checkpoint would not easily work.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499873#comment-16499873
 ] 

Sihua Zhou commented on FLINK-9506:
---

[~yow] I think RocksDB backend could give a more stable performance, but the 
peak performance may be reduced, anyway I think it worths a a try

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499867#comment-16499867
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/4/18 8:05 AM:
---

Thanks for trying it out. Then I think the performance drop and the fluctuation 
might be caused by the state lookup, and since you are using the 
KeyedStateBackend base on Heap, I think the fluctuation might caused by the 
capacity rescaling of the "Hash Map", but  I think the impaction should not be 
that obvious... Maybe [~srichter] could give some more useful and professional 
information...


was (Author: sihuazhou):
Thanks for trying it out. Then I think the performance drop and the fluctuation 
might be caused by the state lookup, and since you are using the 
KeyedStateBackend base on Heap, I think the fluctuation might caused by the 
capacity rescale of the "Hash Map", but  I think the impaction should not be 
that obvious... Maybe [~srichter] could give some more useful and professional 
information...

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499867#comment-16499867
 ] 

Sihua Zhou commented on FLINK-9506:
---

Thanks for trying it out. Then I think the performance drop and the fluctuation 
might be caused by the state lookup, and since you are using the 
KeyedStateBackend base on Heap, I think the fluctuation might caused by the 
capacity rescale of the "Hash Map", but  I think the impaction should not be 
that obvious... Maybe [~srichter] could give some more useful and professional 
information...

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499672#comment-16499672
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/4/18 2:22 AM:
---

Hi [~yow] could you please just replace the getKey() as follow and give a try?
{code:java}
new KeySelector() {
@Override
public Integer getKey(Record r) throws Exception { 
return r.getUNIQUE_KEY().hash() % 128; 
}
}
{code}
if this is work then I think the performance drop may cause by the state lookup.


was (Author: sihuazhou):
Hi [~yow] could you please just replace the getKey() as follow and give a try?
{code}
new KeySelector() {
@Override
public Integer getKey(Record r) throws Exception { 
return r.getUNIQUE_KEY().hash() / 128; 
}
}
{code}

if this is work then I think the performance drop may cause by the state lookup.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499672#comment-16499672
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] could you please just replace the getKey() as follow and give a try?
{code}
new KeySelector() {
@Override
public Integer getKey(Record r) throws Exception { 
return r.getUNIQUE_KEY().hash() / 128; 
}
}
{code}

if this is work then I think the performance drop may cause by the state lookup.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499395#comment-16499395
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] could you please give some information of the `keyBy()`? e.g. what 
are you keyed by in keyBy()? Is it also a POJO that with 50 string member or 
something others?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9486) Introduce TimerState in keyed state backend

2018-06-01 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16497956#comment-16497956
 ] 

Sihua Zhou commented on FLINK-9486:
---

[~srichter] Thanks for your reply, looking forward your PR! ;)

> Introduce TimerState in keyed state backend
> ---
>
> Key: FLINK-9486
> URL: https://issues.apache.org/jira/browse/FLINK-9486
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> This is the first implementation subtask.
> Goal of this PR is to introduce a timer state that is registered with the 
> keyed state backend, similar to other forms of keyed state.
> For the {{HeapKeyedStateBackend}}, this state lives on the same level as the 
> {{StateTable}} that hold other forms of keyed state, and the implementation 
> is basically backed by {{InternalTimerHeap}}.
> For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this 
> state, outside of RocksDB and based upon {{InternalTimerHeap}}. This is an 
> intermediate state, and we will later also implement the alternative to store 
> the timers inside a column families in RocksDB. However, by taking this step, 
> we could also still offer the option to have RocksDB state with heap-based 
> timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9486) Introduce TimerState in keyed state backend

2018-06-01 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16497940#comment-16497940
 ] 

Sihua Zhou commented on FLINK-9486:
---

Hi [~srichter] Do you already work on this? If not, I'd like to take this 
ticket if you don't mind?

> Introduce TimerState in keyed state backend
> ---
>
> Key: FLINK-9486
> URL: https://issues.apache.org/jira/browse/FLINK-9486
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> This is the first implementation subtask.
> Goal of this PR is to introduce a timer state that is registered with the 
> keyed state backend, similar to other forms of keyed state.
> For the {{HeapKeyedStateBackend}}, this state lives on the same level as the 
> {{StateTable}} that hold other forms of keyed state, and the implementation 
> is basically backed by {{InternalTimerHeap}}.
> For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this 
> state, outside of RocksDB and based upon {{InternalTimerHeap}}. This is an 
> intermediate state, and we will later also implement the alternative to store 
> the timers inside a column families in RocksDB. However, by taking this step, 
> we could also still offer the option to have RocksDB state with heap-based 
> timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9468) get outputLimit of LimitedConnectionsFileSystem incorrectly

2018-05-31 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9468:
--
Priority: Critical  (was: Blocker)

> get outputLimit of LimitedConnectionsFileSystem incorrectly
> ---
>
> Key: FLINK-9468
> URL: https://issues.apache.org/jira/browse/FLINK-9468
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> In {{LimitedConnectionsFileSystem#createStream}}, we get the outputLimit 
> incorrectly.
> {code:java}
> private  T createStream(
>   final SupplierWithException streamOpener,
>   final HashSet openStreams,
>   final boolean output) throws IOException {
> final int outputLimit = output && maxNumOpenInputStreams > 0 ? 
> maxNumOpenOutputStreams : Integer.MAX_VALUE;
> /**/
> }
> {code}
> should be 
> {code:java}
> private  T createStream(
>   final SupplierWithException streamOpener,
>   final HashSet openStreams,
>   final boolean output) throws IOException {
> final int outputLimit = output && maxNumOpenOutputStreams > 0 ? 
> maxNumOpenOutputStreams : Integer.MAX_VALUE;
> /**/
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9480) Let local recovery support rescaling

2018-05-30 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494908#comment-16494908
 ] 

Sihua Zhou commented on FLINK-9480:
---

[~srichter] Thanks for your reply, the reason and the use case I want to 
improve this is because of the the online rescaling feature of 1.5. Currently,  
it works as follow:

- trigger a savepoint
- rescaling from the savepoint.

In order to let the online rescaling take advantage of local recovery, we need 
the local recovery to support rescaling, maybe it's not so strict that all node 
can only restore locally, but just a best effect, if some node can't find the 
local state it still can load data from remote.

Yes, I agree that this feature's priority is lower than "timer service" and 
"ttl state" and I just create it in case that we may want to do it in the 
future...

> Let local recovery support rescaling
> 
>
> Key: FLINK-9480
> URL: https://issues.apache.org/jira/browse/FLINK-9480
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Priority: Major
>
> Currently, local recovery only support restore from checkpoint and without 
> rescaling. Maybe we should enable it to support rescaling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9480) Let local recovery support rescaling

2018-05-30 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9480:
--
Affects Version/s: 1.5.0

> Let local recovery support rescaling
> 
>
> Key: FLINK-9480
> URL: https://issues.apache.org/jira/browse/FLINK-9480
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Priority: Major
>
> Currently, local recovery only support restore from checkpoint and without 
> rescaling. Maybe we should enable it to support rescaling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9480) Let local recovery support rescaling

2018-05-30 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9480:
--
Component/s: State Backends, Checkpointing

> Let local recovery support rescaling
> 
>
> Key: FLINK-9480
> URL: https://issues.apache.org/jira/browse/FLINK-9480
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Priority: Major
>
> Currently, local recovery only support restore from checkpoint and without 
> rescaling. Maybe we should enable it to support rescaling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9480) Let local recovery support rescaling

2018-05-30 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494880#comment-16494880
 ] 

Sihua Zhou commented on FLINK-9480:
---

[~stefanrichte...@gmail.com] What do you think of this?

> Let local recovery support rescaling
> 
>
> Key: FLINK-9480
> URL: https://issues.apache.org/jira/browse/FLINK-9480
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sihua Zhou
>Priority: Major
>
> Currently, local recovery only support restore from checkpoint and without 
> rescaling. Maybe we should enable it to support rescaling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9480) Let local recovery support rescaling

2018-05-30 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9480:
-

 Summary: Let local recovery support rescaling
 Key: FLINK-9480
 URL: https://issues.apache.org/jira/browse/FLINK-9480
 Project: Flink
  Issue Type: Improvement
Reporter: Sihua Zhou


Currently, local recovery only support restore from checkpoint and without 
rescaling. Maybe we should enable it to support rescaling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9479) Let the rescale API to use local recovery

2018-05-30 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9479:
-

 Summary: Let the rescale API to use local recovery
 Key: FLINK-9479
 URL: https://issues.apache.org/jira/browse/FLINK-9479
 Project: Flink
  Issue Type: Improvement
  Components: REST, State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou


Currently, flink's online rescale api operates as the follow:
- trigger savepoint for the job
- rescaling the job from the savepoint

We should improve it to use the local recovery to speed up it and reduce the 
network pressure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9475) introduce an approximate version of "select distinct"

2018-05-29 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9475:
-

 Summary: introduce an approximate version of "select distinct"
 Key: FLINK-9475
 URL: https://issues.apache.org/jira/browse/FLINK-9475
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Base on the "Elastic Bloom Filter", it easy to implement an approximate version 
of "select distinct" that have an excellent performance. Its accuracy should be 
configurable, e.g. 95%, 98%.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9474) Introduce an approximate version of "count distinct"

2018-05-29 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9474:
-

 Summary: Introduce an approximate version of "count distinct"
 Key: FLINK-9474
 URL: https://issues.apache.org/jira/browse/FLINK-9474
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


We can implement an approximate version of "count distinct" base on the 
"Elastic Bloom Filter", It could be very fast because we don't need to query 
the state anymore, its accuracy should could be configurable. e.g 95%, 98%.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9364) Add doc of the memory usage in flink

2018-05-29 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9364:
--
Summary: Add doc of the memory usage in flink  (was: Add doc for the memory 
usage in flink)

> Add doc of the memory usage in flink
> 
>
> Key: FLINK-9364
> URL: https://issues.apache.org/jira/browse/FLINK-9364
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> We need to add a doc to describe the memory usage in flink, especially when 
> people use the RocksDBBackend, many people get confuse because of that (I've 
> saw serval question related to this on the user emails).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9468) get outputLimit of LimitedConnectionsFileSystem incorrectly

2018-05-29 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9468:
-

 Summary: get outputLimit of LimitedConnectionsFileSystem 
incorrectly
 Key: FLINK-9468
 URL: https://issues.apache.org/jira/browse/FLINK-9468
 Project: Flink
  Issue Type: Bug
  Components: FileSystem
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0, 1.5.1


In {{LimitedConnectionsFileSystem#createStream}}, we get the outputLimit 
incorrectly.
{code:java}
private  T createStream(
final SupplierWithException streamOpener,
final HashSet openStreams,
final boolean output) throws IOException {

final int outputLimit = output && maxNumOpenInputStreams > 0 ? 
maxNumOpenOutputStreams : Integer.MAX_VALUE;
/**/
}
{code}

should be 
{code:java}
private  T createStream(
final SupplierWithException streamOpener,
final HashSet openStreams,
final boolean output) throws IOException {

final int outputLimit = output && maxNumOpenOutputStreams > 0 ? 
maxNumOpenOutputStreams : Integer.MAX_VALUE;
/**/
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-05-28 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9455:
-

Assignee: Sihua Zhou

> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-05-28 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9456:
-

Assignee: Sihua Zhou

> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8205) Multi key get

2018-05-28 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492731#comment-16492731
 ] 

Sihua Zhou commented on FLINK-8205:
---

Hi [~kkl0u] are you still willing to help with the design and the reviews for 
this ticket?

> Multi key get
> -
>
> Key: FLINK-8205
> URL: https://issues.apache.org/jira/browse/FLINK-8205
> Project: Flink
>  Issue Type: New Feature
>  Components: Queryable State
>Affects Versions: 1.4.0
> Environment: Any
>Reporter: Martin Eden
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Currently the Java queryable state api only allows for fetching one key at a 
> time. It would be extremely useful and more efficient if a similar call 
> exists for submitting multiple keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8601) Introduce ElasticBloomFilter for Approximate calculation and other situations of performance optimization

2018-05-28 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492600#comment-16492600
 ] 

Sihua Zhou commented on FLINK-8601:
---

Hi [~aljoscha] could you please have a look at this? I updated the doc to the 
latest version.

> Introduce ElasticBloomFilter for Approximate calculation and other situations 
> of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h2. *Motivation*
> There are some scenarios drive us to introduce this ElasticBloomFilter, one 
> is Stream Join, another is Data Deduplication, and some special user 
> cases...This has given us a great experience, for example,  we implemented 
> the Runtime Filter Join base on it, and it gives us a great performance 
> improvement. With this feature, It diff us from the "normal stream join", 
> allows us to improve performance while reducing resource consumption by about 
> half!!!
> I will list the two most typical user cases that optimized by the 
> ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data 
> Dedeplication" in brief.
> *Scenario 1: Runtime Filter Join*
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the “joined ratio” of the stream join is often very 
> low, for example.
> - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, buy) log with the promotion_id to analysis the effect 
> of the promotion.
> - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the click_id to find which click of which AD 
> that brings the payment to do attribution.
> - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
> - ….so on
> All these cases have one common property, that is the joined ratio is very 
> low. Here is a example to describe it, we have 1 records from the left 
> stream, and 1 records from the right stream, and we execute  select * 
> from leftStream l join rightStream r on l.id = r.id , we only got 100 record 
> from the result, that is the case for low joined ratio, this is an example 
> for inner join, but it can also apply to left & right join.
> there are more example I can come up with low joined ratio…but the point I 
> want to raise up is that the low joined ratio of stream join in production is 
> a very common phenomenon(maybe even the almost common phenomenon in some 
> companies, at least in our company that is the case).
> *How to improve this?*
> We can see from the above case, 1 record join 1 record and we only 
> got 100 result, that means, we query the state 2 times (1 for the 
> left stream and 1 for the right stream) but only 100 of them are 
> meaningful!!! If we could reduce the useless query times, then we can 
> definitely improve the performance of stream join.
> the way we used to improve this is to introduce the Runtime Filter Join, the 
> mainly ideal is that, we build a filter for the state on each side (left 
> stream & right stream). When we need to query the state on that side we first 
> check the corresponding filter whether the key is possible in the state, if 
> the filter say "not, it impossible in the State", then we stop querying the 
> state, if it say "hmm, it maybe in state", then we need to query the state. 
> As you can see, the best choose of the filter is Bloom Filter, it has all the 
> feature that we want: extremely good performance, non-existence of false 
> negative.
> The simplest pseudo code for Runtime Filter Join(the comments is based on 
> RocksDBBackend)
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
>   Iterator rightIterator = rigthStreamState.iterator();
>   // perform the `seek()` on the RocksDB, and iterator one by one,
>   // this is an expensive operation especially when the key can't be 
> found in RocksDB.
> for (Record recordFromRightState : rightIterator) {
>   ……...
> }
> }
>  
> void 

[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager

2018-05-28 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9410:
-

Assignee: mingleizhang  (was: Sihua Zhou)

> Replace NMClient with NMClientAsync in YarnResourceManager
> --
>
> Key: FLINK-9410
> URL: https://issues.apache.org/jira/browse/FLINK-9410
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
> Fix For: 1.6.0
>
>
> Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} 
> which is called from within the main thread of the {{ResourceManager}}. Since 
> these operations are blocking, we should replace the client with the 
> {{NMClientAsync}} and make the calls non blocking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9351) RM stop assigning slot to Job because the TM killed before connecting to JM successfully

2018-05-25 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9351:
-

Assignee: Sihua Zhou

> RM stop assigning slot to Job because the TM killed before connecting to JM 
> successfully
> 
>
> Key: FLINK-9351
> URL: https://issues.apache.org/jira/browse/FLINK-9351
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0
>
>
> The steps are the following(copied from Stephan's comments in 
> [5931|https://github.com/apache/flink/pull/5931]):
> - JobMaster / SlotPool requests a slot (AllocationID) from the ResourceManager
> - ResourceManager starts a container with a TaskManager
> - TaskManager registers at ResourceManager, which tells the TaskManager to 
> push a slot to the JobManager.
> - TaskManager container is killed
> - The ResourceManager does not queue back the slot requests (AllocationIDs) 
> that it sent to the previous TaskManager, so the requests are lost and need 
> to time out before another attempt is tried.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9426) Harden RocksDBWriteBatchPerformanceTest.benchMark()

2018-05-23 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487276#comment-16487276
 ] 

Sihua Zhou commented on FLINK-9426:
---

one instance: https://travis-ci.org/apache/flink/jobs/382584476

> Harden RocksDBWriteBatchPerformanceTest.benchMark()
> ---
>
> Key: FLINK-9426
> URL: https://issues.apache.org/jira/browse/FLINK-9426
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0, 1.5.1
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> We use the assert to check the performance of WriteBatch is better than 
> Put(), this should be true in general, but in sometimes this could also be 
> false. We may need to follow the other tests under the 
> *org.apache.flink.contrib.streaming.state.benchmark.**, only use the timeout 
> property to valid the test.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9426) Harden RocksDBWriteBatchPerformanceTest.benchMark()

2018-05-23 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9426:
-

 Summary: Harden RocksDBWriteBatchPerformanceTest.benchMark()
 Key: FLINK-9426
 URL: https://issues.apache.org/jira/browse/FLINK-9426
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.0, 1.5.1
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0


We use the assert to check the performance of WriteBatch is better than Put(), 
this should be true in general, but in sometimes this could also be false. We 
may need to follow the other tests under the 
*org.apache.flink.contrib.streaming.state.benchmark.**, only use the timeout 
property to valid the test.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9417) Send heartbeat requests from RPC endpoint's main thread

2018-05-23 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9417:
-

Assignee: Sihua Zhou

> Send heartbeat requests from RPC endpoint's main thread
> ---
>
> Key: FLINK-9417
> URL: https://issues.apache.org/jira/browse/FLINK-9417
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>
> Currently, we use the {{RpcService#scheduledExecutor}} to send heartbeat 
> requests to remote targets. This has the problem that we still see heartbeats 
> from this endpoint also if its main thread is currently blocked. Due to this, 
> the heartbeat response cannot be processed and the remote target times out. 
> On the remote side, this won't be noticed because it still receives the 
> heartbeat requests.
> A solution to this problem would be to send the heartbeat requests to the 
> remote thread through the RPC endpoint's main thread. That way, also the 
> heartbeats would be blocked if the main thread is blocked/busy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8601) Introduce ElasticBloomFilter for Approximate calculation and other situations of performance optimization

2018-05-23 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8601:
--
Description: 
h2. *Motivation*

There are some scenarios drive us to introduce this ElasticBloomFilter, one is 
Stream Join, another is Data Deduplication, and some special user cases...This 
has given us a great experience, for example,  we implemented the Runtime 
Filter Join base on it, and it gives us a great performance improvement. With 
this feature, It diff us from the "normal stream join", allows us to improve 
performance while reducing resource consumption by about half!!!
I will list the two most typical user cases that optimized by the 
ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data 
Dedeplication" in brief.

*Scenario 1: Runtime Filter Join*

In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the “joined ratio” of the stream join is often very low, for 
example.
- stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, buy) log with the promotion_id to analysis the effect 
of the promotion.
- stream join in AD(advertising) attribution: Job need to join the AD click log 
with the item payment log on the click_id to find which click of which AD that 
brings the payment to do attribution.
- stream join in click log analysis of doc: Job need to join viewed log(doc 
viewed by users) with the click log (doc clicked by users) to analysis the 
reason of the click and the property of the users.
- ….so on

All these cases have one common property, that is the joined ratio is very low. 
Here is a example to describe it, we have 1 records from the left stream, 
and 1 records from the right stream, and we execute  select * from 
leftStream l join rightStream r on l.id = r.id , we only got 100 record from 
the result, that is the case for low joined ratio, this is an example for inner 
join, but it can also apply to left & right join.
there are more example I can come up with low joined ratio…but the point I want 
to raise up is that the low joined ratio of stream join in production is a very 
common phenomenon(maybe even the almost common phenomenon in some companies, at 
least in our company that is the case).

*How to improve this?*

We can see from the above case, 1 record join 1 record and we only got 
100 result, that means, we query the state 2 times (1 for the left 
stream and 1 for the right stream) but only 100 of them are meaningful!!! 
If we could reduce the useless query times, then we can definitely improve the 
performance of stream join.
the way we used to improve this is to introduce the Runtime Filter Join, the 
mainly ideal is that, we build a filter for the state on each side (left stream 
& right stream). When we need to query the state on that side we first check 
the corresponding filter whether the key is possible in the state, if the 
filter say "not, it impossible in the State", then we stop querying the state, 
if it say "hmm, it maybe in state", then we need to query the state. As you can 
see, the best choose of the filter is Bloom Filter, it has all the feature that 
we want: extremely good performance, non-existence of false negative.

The simplest pseudo code for Runtime Filter Join(the comments is based on 
RocksDBBackend)
{code:java}
void performJoinNormally(Record recordFromLeftStream) {
Iterator rightIterator = rigthStreamState.iterator();
// perform the `seek()` on the RocksDB, and iterator one by one,
// this is an expensive operation especially when the key can't be 
found in RocksDB.
for (Record recordFromRightState : rightIterator) {
……...
}
}
 
void performRuntimeFilterJoin(Record recordFromLeftStream) {
Iterator rightIterator = EMPTY_ITERATOR;
if (rigthStreamfilter.containsCurrentKey()) {
rightIterator = rigthStreamState.iterator();
}
 // perform the `seek()` only when filter.containsCurrentKey() return true
for (Record recordFromRightState : rightIterator) {
...
}
 // add the current key into the filter of left stream.
leftStreamFilter.addCurrentKey();
}
{code}

*Scenario 2:  Data Deduplication*

We have implemented two general functions based on the ElasticBloomFilter. They 
are count(distinct x) and select distinct x, y, z from table. Unlike the 
Runtime Filter Join the result of this two functions is approximate, not 
exactly. There are used in the scenario where we don't need a 100% accurate 

[jira] [Updated] (FLINK-8601) Introduce ElasticBloomFilter for Approximate calculation and other situations of performance optimization

2018-05-23 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8601:
--
Description: 

h2. *Motivation*

There are some scenarios drive us to introduce this ElasticBloomFilter, one is 
Stream Join, another is Data Deduplication, and some special user cases...This 
has given us a great experience, for example,  we implemented the Runtime 
Filter Join base on it, and it gives us a great performance improvement. With 
this feature, It diff us from the "normal stream join", allows us to improve 
performance while reducing resource consumption by about half!!!
I will list the two most typical user cases that optimized by the 
ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data 
Dedeplication" in brief.

*Scenario 1: Runtime Filter Join*

In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the “joined ratio” of the stream join is often very low, for 
example.
- stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, buy) log with the promotion_id to analysis the effect 
of the promotion.
- stream join in AD(advertising) attribution: Job need to join the AD click log 
with the item payment log on the click_id to find which click of which AD that 
brings the payment to do attribution.
- stream join in click log analysis of doc: Job need to join viewed log(doc 
viewed by users) with the click log (doc clicked by users) to analysis the 
reason of the click and the property of the users.
- ….so on

All these cases have one common property, that is the joined ratio is very low. 
Here is a example to describe it, we have 1 records from the left stream, 
and 1 records from the right stream, and we execute  select * from 
leftStream l join rightStream r on l.id = r.id , we only got 100 record from 
the result, that is the case for low joined ratio, this is an example for inner 
join, but it can also apply to left & right join.
there are more example I can come up with low joined ratio…but the point I want 
to raise up is that the low joined ratio of stream join in production is a very 
common phenomenon(maybe even the almost common phenomenon in some companies, at 
least in our company that is the case).

*How to improve this?*

We can see from the above case, 1 record join 1 record and we only got 
100 result, that means, we query the state 2 times (1 for the left 
stream and 1 for the right stream) but only 100 of them are meaningful!!! 
If we could reduce the useless query times, then we can definitely improve the 
performance of stream join.
the way we used to improve this is to introduce the Runtime Filter Join, the 
mainly ideal is that, we build a filter for the state on each side (left stream 
& right stream). When we need to query the state on that side we first check 
the corresponding filter whether the key is possible in the state, if the 
filter say "not, it impossible in the State", then we stop querying the state, 
if it say "hmm, it maybe in state", then we need to query the state. As you can 
see, the best choose of the filter is Bloom Filter, it has all the feature that 
we want: extremely good performance, non-existence of false negative.

The simplest pseudo code for Runtime Filter Join(the comments is based on 
RocksDBBackend)
{code:java}
void performJoinNormally(Record recordFromLeftStream) {
Iterator rightIterator = rigthStreamState.iterator();
// perform the `seek()` on the RocksDB, and iterator one by one,
// this is an expensive operation especially when the key can't be 
found in RocksDB.
for (Record recordFromRightState : rightIterator) {
……...
}
}
 
void performRuntimeFilterJoin(Record recordFromLeftStream) {
Iterator rightIterator = EMPTY_ITERATOR;
if (rigthStreamfilter.containsCurrentKey()) {
rightIterator = rigthStreamState.iterator();
}
 // perform the `seek()` only when filter.containsCurrentKey() return true
for (Record recordFromRightState : rightIterator) {
...
}
 // add the current key into the filter of left stream.
leftStreamFilter.addCurrentKey();
}
{code}

*Scenario 2:  Data Deduplication*

We have implemented two general functions based on the ElasticBloomFilter. They 
are count(distinct x) and select distinct x, y, z from table. Unlike the 
Runtime Filter Join the result of this two functions is approximate, not 
exactly. There are used in the scenario where we don't need a 100% accurate 

[jira] [Updated] (FLINK-9070) Improve performance of RocksDBMapState.clear()

2018-05-22 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9070:
--
Affects Version/s: (was: 1.6.0)
   1.5.0

> Improve performance of RocksDBMapState.clear()
> --
>
> Key: FLINK-9070
> URL: https://issues.apache.org/jira/browse/FLINK-9070
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, RocksDBMapState.clear() is implemented by iterating over all the 
> keys and drop them one by one. This iteration can be quite slow with: 
>  * Large maps
>  * High-churn maps with a lot of tombstones
> There are a few methods to speed-up deletion for a range of keys, each with 
> their own caveats:
>  * DeleteRange: still experimental, likely buggy
>  * DeleteFilesInRange + CompactRange: only good for large ranges
>  
> Flink can also keep a list of inserted keys in-memory, then directly delete 
> them without having to iterate over the Rocksdb database again. 
>  
> Reference:
>  * [RocksDB article about range 
> deletion|https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys]
>  * [Bug in DeleteRange|https://pingcap.com/blog/2017-09-08-rocksdbbug]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9070) Improve performance of RocksDBMapState.clear()

2018-05-22 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9070:
--
Fix Version/s: (was: 1.6.0)
   1.5.0

> Improve performance of RocksDBMapState.clear()
> --
>
> Key: FLINK-9070
> URL: https://issues.apache.org/jira/browse/FLINK-9070
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, RocksDBMapState.clear() is implemented by iterating over all the 
> keys and drop them one by one. This iteration can be quite slow with: 
>  * Large maps
>  * High-churn maps with a lot of tombstones
> There are a few methods to speed-up deletion for a range of keys, each with 
> their own caveats:
>  * DeleteRange: still experimental, likely buggy
>  * DeleteFilesInRange + CompactRange: only good for large ranges
>  
> Flink can also keep a list of inserted keys in-memory, then directly delete 
> them without having to iterate over the Rocksdb database again. 
>  
> Reference:
>  * [RocksDB article about range 
> deletion|https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys]
>  * [Bug in DeleteRange|https://pingcap.com/blog/2017-09-08-rocksdbbug]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager

2018-05-22 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9410:
-

Assignee: Sihua Zhou

> Replace NMClient with NMClientAsync in YarnResourceManager
> --
>
> Key: FLINK-9410
> URL: https://issues.apache.org/jira/browse/FLINK-9410
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0
>
>
> Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} 
> which is called from within the main thread of the {{ResourceManager}}. Since 
> these operations are blocking, we should replace the client with the 
> {{NMClientAsync}} and make the calls non blocking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9401) Data lost when rescaling the job from incremental checkpoint

2018-05-18 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-9401.
-
Resolution: Invalid

> Data lost when rescaling the job from incremental checkpoint
> 
>
> Key: FLINK-9401
> URL: https://issues.apache.org/jira/browse/FLINK-9401
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> We may lost data when rescaling job from incremental checkpoint because of 
> the following code.
> {code:java}
> try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, 
> columnFamilyHandle)) {
>int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
>byte[] startKeyGroupPrefixBytes = new 
> byte[stateBackend.keyGroupPrefixBytes];
>for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
>   startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
>}
>iterator.seek(startKeyGroupPrefixBytes);
>while (iterator.isValid()) {
>   int keyGroup = 0;
>   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
>  keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
>   }
>   if (stateBackend.keyGroupRange.contains(keyGroup)) {
>  stateBackend.db.put(targetColumnFamilyHandle,
> iterator.key(), iterator.value());
>   }
>   iterator.next();
>}
> }
> {code}
> For every state handle to fetch the target data, we 
> _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be 
> INVALID immediately if the state handle's _start key group_ is bigger that 
> _state.keyGroupRange.getStartKeyGroup()_. Then, data lost...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9401) Data lost when rescaling the job from incremental checkpoint

2018-05-18 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9401:
-

 Summary: Data lost when rescaling the job from incremental 
checkpoint
 Key: FLINK-9401
 URL: https://issues.apache.org/jira/browse/FLINK-9401
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.2, 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


We may lost data when rescaling job from incremental checkpoint because of the 
following code.
{code:java}
try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, 
columnFamilyHandle)) {

   int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
   byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
  startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
   }

   iterator.seek(startKeyGroupPrefixBytes);

   while (iterator.isValid()) {

  int keyGroup = 0;
  for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
 keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
  }

  if (stateBackend.keyGroupRange.contains(keyGroup)) {
 stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
  }

  iterator.next();
   }
}
{code}

For every state handle to fetch the target data, we 
_seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be 
INVALID immediately if the state handle's _start key group_ is bigger that 
_state.keyGroupRange.getStartKeyGroup()_. Then, data lost...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9375) Introduce AbortCheckpoint message from JM to TMs

2018-05-17 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480041#comment-16480041
 ] 

Sihua Zhou commented on FLINK-9375:
---

Hi [~srichter][~yanghua]I think this is a bit looks like a duplicate of this 
[FLINK-8871|https://issues.apache.org/jira/browse/FLINK-8871] which needs a 
good discussion as you([~srichter]) have mentioned, or 
[FLINK-8871|https://issues.apache.org/jira/browse/FLINK-8871] should be blocked 
by this ticket (which only finish the RPC related works)? Maybe we should 
connect these two guys together to get a better picture...what do you think?

> Introduce AbortCheckpoint message from JM to TMs
> 
>
> Key: FLINK-9375
> URL: https://issues.apache.org/jira/browse/FLINK-9375
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: vinoyang
>Priority: Major
>
> We should introduce an {{AbortCheckpoint}} message that a jobmanager can send 
> to taskmanagers if a checkpoint is canceled so that the operators can eagerly 
> stop their alignment phase and continue to normal processing. This can reduce 
> some backpressure issues in the context of canceled and restarted checkpoints.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9373) Fix potential data losing for RocksDBBackend

2018-05-17 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16479462#comment-16479462
 ] 

Sihua Zhou commented on FLINK-9373:
---

[~srichter] FYI [3558|https://github.com/facebook/rocksdb/issues/3558], got 
reply from RocksDB. I think we chosen the right way that should go ;), cause 
the status could be reset.

> Fix potential data losing for RocksDBBackend
> 
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9373) Fix potential data losing for RocksDBBackend

2018-05-17 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478865#comment-16478865
 ] 

Sihua Zhou commented on FLINK-9373:
---

I will updated the PR quickly.

> Fix potential data losing for RocksDBBackend
> 
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9373) Fix potential data losing for RocksDBBackend

2018-05-17 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478848#comment-16478848
 ] 

Sihua Zhou commented on FLINK-9373:
---

I think that makes sense.

> Fix potential data losing for RocksDBBackend
> 
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8918) Introduce Runtime Filter Join

2018-05-16 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477433#comment-16477433
 ] 

Sihua Zhou commented on FLINK-8918:
---

[~fhueske] related to the saturated problem, I want to add some more 
informations. In ElasticBloomFilter, bloom filter are allocated lazily, and can 
be scalable. But yes, we should discuss this more deeply on FLINK-8601.

> Introduce Runtime Filter Join
> -
>
> Key: FLINK-8918
> URL: https://issues.apache.org/jira/browse/FLINK-8918
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the `joined ratio` of the stream join is often very 
> low, for example.
>  - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, payment, collection, retweet) log with the 
> `promotion_id` to analysis the effect of the promotion.
>  - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the `click_id` to find which click of which 
> AD that brings the payment to do attribution.
>  - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
>  - ….so on
> All these cases have one common property, that is the _joined ratio_ is very 
> low. Here is a example to describe it, imagine that, we have 1 records 
> from the left stream, and 1 records from the right stream, and we execute 
> _select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
> 100 record from the result, that is the case for low _joined ratio_, this is 
> an example for inner join, but it can also apply to left & right join.
> there are more example I can come up with low _joined ratio_ , but the most 
> important point I want to expressed is that, the low _joined ratio_ of stream 
> join in production is a very common phenomenon(maybe the almost common 
> phenomenon in some companies, at least in our company that is the case).
> *Then how to improve it?*
> We can see from the above case, 1 record join 1 record we only got 
> 100 result, that means, we query the state 2 times (1 for the left 
> stream and 1 for the right stream) but only 100 of them are meaningful!!! 
> If we could reduce the useless query times, then we can definitely improve 
> the performance of stream join.
> the way we used to improve this is to introduce the _Runtime Filter Join_, 
> the mainly ideal is that, we build a _filter_ for the state on each side 
> (left stream & right stream). When we need to query the state on that side we 
> first check the corresponding _filter_ whether the _key_ is possible in the 
> state, if the _filter_ say "not, it impossible in the state", then we stop 
> querying the state, if it say "hmm, it maybe in state", then we need to query 
> the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, 
> it has all the feature that we expected: _extremely good performance_, 
> _non-existence of false negative_.
>  
> *the simplest pseudo code for _Runtime Filter Join_(the comments inline are 
> based on RocksDBBackend)*
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
> Iterator rightIterator = rigthStreamState.iterator();
> // perform the `seek()` on the RocksDB, and iterator one by one,
> // this is an expensive operation especially when the key can't be found 
> in RocksDB.
> for (Record recordFromRightState : rightIterator) {
> ...
> }
> }
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
> Iterator rightIterator = EMPTY_ITERATOR;
> if (rigthStreamfilter.containsCurrentKey()) {
> rightIterator = rigthStreamState.iterator();
> }
> // perform the `seek()` only when filter.containsCurrentKey() return true
> for (Record recordFromRightState : rightIterator) {
> ...
> }
> 
> // add the current key into the filter of left stream.
>   leftStreamFilter.addCurrentKey();
> }
> {code}
> A description of Runtime Filter Join for batch join can be found 
> [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>  

[jira] [Commented] (FLINK-8918) Introduce Runtime Filter Join

2018-05-16 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477429#comment-16477429
 ] 

Sihua Zhou commented on FLINK-8918:
---

Hi [~fhueske], sorry that I'm definitely not asked you to promise to take this 
into 1.6 indeed...I just need to know that the community won't object this 
feature obviously before I'm starting on it. I agree with you that we should 
get a discussion about 
[FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] firstly, I will 
appreciate it very much if you could join the discussion! Thanks!

Best, Sihua 

> Introduce Runtime Filter Join
> -
>
> Key: FLINK-8918
> URL: https://issues.apache.org/jira/browse/FLINK-8918
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the `joined ratio` of the stream join is often very 
> low, for example.
>  - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, payment, collection, retweet) log with the 
> `promotion_id` to analysis the effect of the promotion.
>  - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the `click_id` to find which click of which 
> AD that brings the payment to do attribution.
>  - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
>  - ….so on
> All these cases have one common property, that is the _joined ratio_ is very 
> low. Here is a example to describe it, imagine that, we have 1 records 
> from the left stream, and 1 records from the right stream, and we execute 
> _select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
> 100 record from the result, that is the case for low _joined ratio_, this is 
> an example for inner join, but it can also apply to left & right join.
> there are more example I can come up with low _joined ratio_ , but the most 
> important point I want to expressed is that, the low _joined ratio_ of stream 
> join in production is a very common phenomenon(maybe the almost common 
> phenomenon in some companies, at least in our company that is the case).
> *Then how to improve it?*
> We can see from the above case, 1 record join 1 record we only got 
> 100 result, that means, we query the state 2 times (1 for the left 
> stream and 1 for the right stream) but only 100 of them are meaningful!!! 
> If we could reduce the useless query times, then we can definitely improve 
> the performance of stream join.
> the way we used to improve this is to introduce the _Runtime Filter Join_, 
> the mainly ideal is that, we build a _filter_ for the state on each side 
> (left stream & right stream). When we need to query the state on that side we 
> first check the corresponding _filter_ whether the _key_ is possible in the 
> state, if the _filter_ say "not, it impossible in the state", then we stop 
> querying the state, if it say "hmm, it maybe in state", then we need to query 
> the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, 
> it has all the feature that we expected: _extremely good performance_, 
> _non-existence of false negative_.
>  
> *the simplest pseudo code for _Runtime Filter Join_(the comments inline are 
> based on RocksDBBackend)*
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
> Iterator rightIterator = rigthStreamState.iterator();
> // perform the `seek()` on the RocksDB, and iterator one by one,
> // this is an expensive operation especially when the key can't be found 
> in RocksDB.
> for (Record recordFromRightState : rightIterator) {
> ...
> }
> }
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
> Iterator rightIterator = EMPTY_ITERATOR;
> if (rigthStreamfilter.containsCurrentKey()) {
> rightIterator = rigthStreamState.iterator();
> }
> // perform the `seek()` only when filter.containsCurrentKey() return true
> for (Record recordFromRightState : rightIterator) {
> ...
> }
> 
> // add the current key into the filter of left stream.
>   

[jira] [Comment Edited] (FLINK-8918) Introduce Runtime Filter Join

2018-05-16 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477373#comment-16477373
 ] 

Sihua Zhou edited comment on FLINK-8918 at 5/16/18 12:49 PM:
-

Hi [~fhueske] thank you very much for your reply, you are right there are a few 
challenges with using a bloom filter to implement the Runtime Filter Join. I 
have created a separate JIRA 
[FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] (which this issue 
is blocked by) to address that, and in that ticket I propose to introduce a 
_ElasticBloomFilter_ which could handle the question that you mentioned above, 
there also a google doc that related to it, but it's a bit outdated now... I'm 
writing a new one and preparing to fire a DISCUSSION on dev mail once 1.5 is 
releasing out. Here, I'd like to answer your question one by one in a bref 
version, we can discussion it on detail on the dev mail later:

> Bloom Filter Data Structure: it is not possible to remove a key from a bloom 
> filter

Yes, to remove a record from the Bloom Filter is impossible, but we can somehow 
implement a relax TTL for the data, which could help to release the memory.

> Checkpoint, Recovery, Rescaling

The ElasticBloomFilter(as you've pointed out is scoped to key group) introduced 
in  [FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] will support 
checkpoint & recovery & rescaling. More over, it even support to handle data 
skewed which is also a big problem when using the bloom filter in FLINK.

Additional, with introducing that type of ElasticBloomFilter there are some 
other interesting things we can do upon it...What do you think? do you object 
this to go into 1.6? If not I going to fire a DISCUSSION on the dev mail once 
1.5 is released out (as I mentioned about)...

Best, Sihua



was (Author: sihuazhou):
Hi [~fhueske] thank you very much for your reply, you are right there are a few 
challenges with using a bloom filter to implement the Runtime Filter Join. I 
have created a separate JIRA 
[FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] (which this issue 
is blocked by) to address that, and in that ticket I propose to introduce a 
_ElasticBloomFilter_ which could handle the question that you mentioned above, 
there also a google doc that related to it, but it's a bit outdated now... I'm 
writing a new one and preparing to fire a DISCUSSION on dev mail once 1.5 is 
releasing out. Here, I'd like to answer your question one by one in a bref 
version, we can discussion it on detail on the dev mail later:

> Bloom Filter Data Structure: it is not possible to remove a key from a bloom 
> filter

Yes, to remove a record from the Bloom Filter is impossible, but we can somehow 
implement a relax TTL for the data, which could help to release the memory.

> Checkpoint, Recovery, Rescaling

The ElasticBloomFilter introduced in  
[FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] will support 
checkpoint & recovery & rescaling. More over, it even support to handle data 
skewed which is also a big problem when using the bloom filter in FLINK.

Additional, with introducing that type of ElasticBloomFilter there are some 
other interesting things we can do upon it...What do you think? do you object 
this to go into 1.6? If not I going to fire a DISCUSSION on the dev mail once 
1.5 is released out (as I mentioned about)...

Best, Sihua


> Introduce Runtime Filter Join
> -
>
> Key: FLINK-8918
> URL: https://issues.apache.org/jira/browse/FLINK-8918
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the `joined ratio` of the stream join is often very 
> low, for example.
>  - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, payment, collection, retweet) log with the 
> `promotion_id` to analysis the effect of the promotion.
>  - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the `click_id` to find which click of which 
> AD that brings the payment to do attribution.
>  - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the 

[jira] [Commented] (FLINK-8918) Introduce Runtime Filter Join

2018-05-16 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477373#comment-16477373
 ] 

Sihua Zhou commented on FLINK-8918:
---

Hi [~fhueske] thank you very much for your reply, you are right there are a few 
challenges with using a bloom filter to implement the Runtime Filter Join. I 
have created a separate JIRA 
[FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] (which this issue 
is blocked by) to address that, and in that ticket I propose to introduce a 
_ElasticBloomFilter_ which could handle the question that you mentioned above, 
there also a google doc that related to it, but it's a bit outdated now... I'm 
writing a new one and preparing to fire a DISCUSSION on dev mail once 1.5 is 
releasing out. Here, I'd like to answer your question one by one in a bref 
version, we can discussion it on detail on the dev mail later:

> Bloom Filter Data Structure: it is not possible to remove a key from a bloom 
> filter

Yes, to remove a record from the Bloom Filter is impossible, but we can somehow 
implement a relax TTL for the data, which could help to release the memory.

> Checkpoint, Recovery, Rescaling

The ElasticBloomFilter introduced in  
[FLINK-8601|https://issues.apache.org/jira/browse/FLINK-8601] will support 
checkpoint & recovery & rescaling. More over, it even support to handle data 
skewed which is also a big problem when using the bloom filter in FLINK.

Additional, with introducing that type of ElasticBloomFilter there are some 
other interesting things we can do upon it...What do you think? do you object 
this to go into 1.6? If not I going to fire a DISCUSSION on the dev mail once 
1.5 is released out (as I mentioned about)...

Best, Sihua


> Introduce Runtime Filter Join
> -
>
> Key: FLINK-8918
> URL: https://issues.apache.org/jira/browse/FLINK-8918
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the `joined ratio` of the stream join is often very 
> low, for example.
>  - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, payment, collection, retweet) log with the 
> `promotion_id` to analysis the effect of the promotion.
>  - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the `click_id` to find which click of which 
> AD that brings the payment to do attribution.
>  - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
>  - ….so on
> All these cases have one common property, that is the _joined ratio_ is very 
> low. Here is a example to describe it, imagine that, we have 1 records 
> from the left stream, and 1 records from the right stream, and we execute 
> _select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
> 100 record from the result, that is the case for low _joined ratio_, this is 
> an example for inner join, but it can also apply to left & right join.
> there are more example I can come up with low _joined ratio_ , but the most 
> important point I want to expressed is that, the low _joined ratio_ of stream 
> join in production is a very common phenomenon(maybe the almost common 
> phenomenon in some companies, at least in our company that is the case).
> *Then how to improve it?*
> We can see from the above case, 1 record join 1 record we only got 
> 100 result, that means, we query the state 2 times (1 for the left 
> stream and 1 for the right stream) but only 100 of them are meaningful!!! 
> If we could reduce the useless query times, then we can definitely improve 
> the performance of stream join.
> the way we used to improve this is to introduce the _Runtime Filter Join_, 
> the mainly ideal is that, we build a _filter_ for the state on each side 
> (left stream & right stream). When we need to query the state on that side we 
> first check the corresponding _filter_ whether the _key_ is possible in the 
> state, if the _filter_ say "not, it impossible in the state", then we stop 
> querying the state, if it say "hmm, it maybe in state", then we need to query 
> the state. As you can 

[jira] [Updated] (FLINK-9373) Fix potential data losing for RocksDBBackend

2018-05-16 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9373:
--
Fix Version/s: 1.5.0

> Fix potential data losing for RocksDBBackend
> 
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9373) Fix potential data losing for RocksDBBackend

2018-05-16 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9373:
--
Summary: Fix potential data losing for RocksDBBackend  (was: Always call 
RocksIterator.status() to check the internal error of RocksDB)

> Fix potential data losing for RocksDBBackend
> 
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8918) Introduce Runtime Filter Join

2018-05-16 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8918:
--
Description: 
In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the `joined ratio` of the stream join is often very low, for 
example.
 - stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, payment, collection, retweet) log with the 
`promotion_id` to analysis the effect of the promotion.

 - stream join in AD(advertising) attribution: Job need to join the AD click 
log with the item payment log on the `click_id` to find which click of which AD 
that brings the payment to do attribution.
 - stream join in click log analysis of doc: Job need to join viewed log(doc 
viewed by users) with the click log (doc clicked by users) to analysis the 
reason of the click and the property of the users.
 - ….so on

All these cases have one common property, that is the _joined ratio_ is very 
low. Here is a example to describe it, imagine that, we have 1 records from 
the left stream, and 1 records from the right stream, and we execute 
_select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
100 record from the result, that is the case for low _joined ratio_, this is an 
example for inner join, but it can also apply to left & right join.

there are more example I can come up with low _joined ratio_ , but the most 
important point I want to expressed is that, the low _joined ratio_ of stream 
join in production is a very common phenomenon(maybe the almost common 
phenomenon in some companies, at least in our company that is the case).

*Then how to improve it?*

We can see from the above case, 1 record join 1 record we only got 100 
result, that means, we query the state 2 times (1 for the left stream 
and 1 for the right stream) but only 100 of them are meaningful!!! If we 
could reduce the useless query times, then we can definitely improve the 
performance of stream join.

the way we used to improve this is to introduce the _Runtime Filter Join_, the 
mainly ideal is that, we build a _filter_ for the state on each side (left 
stream & right stream). When we need to query the state on that side we first 
check the corresponding _filter_ whether the _key_ is possible in the state, if 
the _filter_ say "not, it impossible in the state", then we stop querying the 
state, if it say "hmm, it maybe in state", then we need to query the state. As 
you can see, the best choose of the _filter_ is _Bloom Filter_, it has all the 
feature that we expected: _extremely good performance_, _non-existence of false 
negative_.

 


*the simplest pseudo code for _Runtime Filter Join_(the comments inline are 
based on RocksDBBackend)*
{code:java}
void performJoinNormally(Record recordFromLeftStream) {
Iterator rightIterator = rigthStreamState.iterator();
// perform the `seek()` on the RocksDB, and iterator one by one,
// this is an expensive operation especially when the key can't be found in 
RocksDB.
for (Record recordFromRightState : rightIterator) {
...
}
}

void performRuntimeFilterJoin(Record recordFromLeftStream) {
Iterator rightIterator = EMPTY_ITERATOR;
if (rigthStreamfilter.containsCurrentKey()) {
rightIterator = rigthStreamState.iterator();
}
// perform the `seek()` only when filter.containsCurrentKey() return true
for (Record recordFromRightState : rightIterator) {
...
}

// add the current key into the filter of left stream.
leftStreamFilter.addCurrentKey();
}
{code}

A description of Runtime Filter Join for batch join can be found 
[here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
 (even though it not for stream join original, but we can easily refer it to 
`stream join`)

  was:
In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the `joined ratio` of the stream join is often very low, for 
example.
 - stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, payment, collection, retweet) log with the 
`promotion_id` to analysis the effect of the promotion.

 

  1   2   3   4   5   >