[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2021-04-16 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323295#comment-17323295
 ] 

Flink Jira Bot commented on FLINK-12351:


This issue is assigned but has not received an update in 7 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2020-08-20 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181094#comment-17181094
 ] 

Piotr Nowojski commented on FLINK-12351:


Yes that's true. I was more worried about relaying on an assumption that 
network stack is not re using the records in any way. But maybe this is not big 
of an issue and could be guarded by some unit test for `AsyncWaitOperator`.

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2020-08-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181081#comment-17181081
 ] 

Wenlong Lyu commented on FLINK-12351:
-

[~pnowojski] As I known, it is easy to get whether the op is the head of chain 
by: StreamConfig#isChainStart

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2020-08-19 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180345#comment-17180345
 ] 

Piotr Nowojski commented on FLINK-12351:


I think this is a valid concern. I'm not sure how much performance impact 
matters here (probably not much judging by the common AsyncWaitOperator 
usecases). We we could try to avoid the deepcopy overhead when the operator is 
the head of the chain, but I'm not sure how elegant would it be to depend on 
such behaviour.

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2020-08-19 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180304#comment-17180304
 ] 

Till Rohrmann commented on FLINK-12351:
---

Thanks for reporting this issue [~wenlong.lwl]. Pulling in [~AHeise] and 
[~pnowojski] who worked on FLINK-16219 and who might be able to tell more about 
the implications of this change.

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2020-08-18 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180209#comment-17180209
 ] 

Wenlong Lyu commented on FLINK-12351:
-

Hi, [~jark][~trohrmann] I think we may need to fix this issue in 1.11, it may 
be  a regression for 1.11. 

Before 1.11, AsyncWaitOperator is not chainnable because of 
[FLINK-13063|https://issues.apache.org/jira/browse/FLINK-13063], all of input 
records are new created from network inputs, so this bug would not be 
triggerred.

In 1.11, AsyncWaitOperator is chainnable 
again([FLINK-16219|https://issues.apache.org/jira/browse/FLINK-16219]), this 
bug would affect the result when object reuse is enabled. 

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2019-04-29 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16829298#comment-16829298
 ] 

Jark Wu commented on FLINK-12351:
-

[~rmetzger] Thanks for the reminder. 

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2019-04-29 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16829245#comment-16829245
 ] 

Robert Metzger commented on FLINK-12351:


Hey [~jark], thanks a lot for opening a ticket in the FLINK bug tracker. I just 
manually assigned the ticket to a component.

For future tickets, please remember to always assign a new issue to a 
component, so that the component owner can pick it up.

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2019-04-29 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16829072#comment-16829072
 ] 

Till Rohrmann commented on FLINK-12351:
---

Go ahead with fixing this problem [~jark].

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2019-04-28 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16828869#comment-16828869
 ] 

Jark Wu commented on FLINK-12351:
-

Hi [~aitozi], I think fix the bug in AsyncWaitOperator and enable objectReuse 
on operator level are two orthogonal problems. We can create another JIRA to 
discuss the operator level object reuse problem.

Currently, I only find the AsyncWaitOperator is affected, because it doesn't 
deep copy input record before put it into heap buffer (Java ArrayDeque).

IMO, no matter object reuse is enabled or not, the AsyncWaitOperator should 
output the same result, because it's the framework code not user code.

Hi [~till.rohrmann], what do you think about this? If you don't object, I can 
create a PR for this.

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2019-04-28 Thread aitozi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827861#comment-16827861
 ] 

aitozi commented on FLINK-12351:


Hi, [~jark]

I checked the objectuse config in other operator just now. Found that it's 
mostly been checked in batch related operator. Is only AsyncWaitOperator 
affected?  But i think other operator may be affected too. If user use the 
object reuse feature without paying attention to the side effect of the changes 
on the object as the doc of ExecutionConfig#enableObjectReuse say, the result 
may be unpredictable. Or we have to enable objectReuse to operator level to let 
user config the behaviour of the operator individually, what's your idea?

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)