[jira] [Commented] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull

2024-01-10 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805124#comment-17805124
 ] 

Zakelly Lan commented on FLINK-33881:
-

This is Great! Well done.

> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> 
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List> getUnexpiredOrNull(@Nonnull List> 
> ttlValues) {
> //...
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues;  //return the original ttlValues
> }
> List> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> }
> }
> //  .
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull

2024-01-10 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805110#comment-17805110
 ] 

Jinzhong Li commented on FLINK-33881:
-

[~Zakelly]  I have done the performance testing with TtlListStateBenchmark. The 
results show that TtlListState has  about 27%+ performance improvement with 
this optimization.

 
|Benchmark|Param: backendType|Param: expiredOption|Param: 
stateVisibility|Param: updateType| Score
{color:#FF}without{color} optimization|Score 
{color:#FF}with{color} optimization|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|686.11|902.399321|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|713.428757|906.389711|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|720.125366|902.577086|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|736.226185|881.717185|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|733.647731|894.295796|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|721.162458|901.177007|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|731.355353|890.06144|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|726.943995|909.319816|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|212.313443|261.96109|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|211.863397|258.266859|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|215.67765|259.52777|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|208.505043|263.768959|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|201.646868|257.422463|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|214.599971|267.296024|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|207.070109|263.741483|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|213.35357|265.783929|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|536.94471|688.041948|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|538.545608|678.766982|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|530.422217|688.008454|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|540.517729|690.383004|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|538.089004|695.018344|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|532.924439|690.375419|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|513.289823|715.811655|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|534.967481|724.869126|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|630.801404|822.861466|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|495.627707|678.028432|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|645.222027|780.880865|

[jira] [Commented] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull

2024-01-09 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804975#comment-17804975
 ] 

Jinzhong Li commented on FLINK-33881:
-

[~Zakelly] [~masteryhx] 

I have published a pr, could you help to review it?

 

>>>  "And actually I think there may be some value if we could make sure it is 
>>>safe to do shallow copy."

I think it is not safe to do shallow copy.
The CopyOnWrite mechanism in HeapStateBackend only deep copy StateMapEntry 
wrapper, not user Object.  It is necessary to deepcopy listState elements to 
avoid taskThread and snapshotThread access one object concurrently. 

> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> 
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List> getUnexpiredOrNull(@Nonnull List> 
> ttlValues) {
> //...
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues;  //return the original ttlValues
> }
> List> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> }
> }
> //  .
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull

2023-12-20 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799232#comment-17799232
 ] 

Zakelly Lan commented on FLINK-33881:
-

[~lijinzhong] Thus I have no problem. And actually I think there may be some 
value if we could make sure it is safe to do shallow copy.

> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> 
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Minor
> Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List> getUnexpiredOrNull(@Nonnull List> 
> ttlValues) {
> //...
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues;  //return the original ttlValues
> }
> List> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> }
> }
> //  .
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull

2023-12-20 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799012#comment-17799012
 ] 

Jinzhong Li commented on FLINK-33881:
-

[~Zakelly] Sorry for my typo,  I don't think we should change the deep copy to 
shallow copy.

I've already revised it: 1) If expired element is not found in the list, return 
the original list; 2) If found expired element, do the deep copy for unexpired 
elements.

> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> 
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Minor
> Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List> getUnexpiredOrNull(@Nonnull List> 
> ttlValues) {
> //...
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues;  //return the original ttlValues
> }
> List> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> }
> }
> //  .
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull

2023-12-19 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798825#comment-17798825
 ] 

Zakelly Lan commented on FLINK-33881:
-

Thanks for clarification! It is definitely useful, but I'm not sure is it safe 
to do shallow copy instead of deep copy.

> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> 
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Minor
> Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List> getUnexpiredOrNull(@Nonnull List> 
> ttlValues) {
> //...
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues;  //return the original ttlValues
> }
> List> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> unexpired.add(ttlValues.get(i));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> unexpired.add(ttlValues.get(i));
> }
> }
> }
> //  .
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull

2023-12-19 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798799#comment-17798799
 ] 

Jinzhong Li commented on FLINK-33881:
-

[~Zakelly]  Thanks for your reply.

I think this ticket is targeting a different optimization point than 
FLINK-30088.

The way mentioned in ticket could avoid elements copy of the TtlListState if 
there is no expired data. But FLINK-30088 still need copy list elements, 
consuming lots of unnecessary cpu.

> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> 
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Minor
> Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List> getUnexpiredOrNull(@Nonnull List> 
> ttlValues) {
> //...
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues;  //return the original ttlValues
> }
> List> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> unexpired.add(ttlValues.get(i));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> unexpired.add(ttlValues.get(i));
> }
> }
> }
> //  .
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull

2023-12-19 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798648#comment-17798648
 ] 

Zakelly Lan commented on FLINK-33881:
-

Thanks [~lijinzhong]  for reporting this! However I think it may be duplicated 
with the FLINK-30088.

> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> 
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Minor
> Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List> getUnexpiredOrNull(@Nonnull List> 
> ttlValues) {
> //...
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues;  //return the original ttlValues
> }
> List> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> unexpired.add(ttlValues.get(i));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> unexpired.add(ttlValues.get(i));
> }
> }
> }
> //  .
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)