[jira] [Closed] (FLINK-33128) TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on converter

2023-10-26 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-33128.
---

> TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on 
> converter
> -
>
> Key: FLINK-33128
> URL: https://issues.apache.org/jira/browse/FLINK-33128
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Jerome Gagnon
>Assignee: Jerome Gagnon
>Priority: Major
>  Labels: pull-request-available
>
> When using the TestValues connector with nested Row values relying on 
> BinaryArrayWriter the following exception happen : 
> {code:java}
> java.lang.NullPointerException: Cannot invoke 
> "org.apache.flink.table.data.writer.BinaryArrayWriter.getNumElements()" 
> because "this.reuseWriter" is null
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.allocateWriter(ArrayObjectArrayConverter.java:140)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toBinaryArrayData(ArrayObjectArrayConverter.java:114)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:93)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.lambda$indexDataByKey$0(TestValuesRuntimeFunctions.java:626)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.indexDataByKey(TestValuesRuntimeFunctions.java:624)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.open(TestValuesRuntimeFunctions.java:601)
>     at LookupFunction$370.open(Unknown Source)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.open(LookupJoinRunner.java:67)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner.open(LookupJoinWithCalcRunner.java:51)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
>     at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550){code}
>  
> This 

[jira] [Resolved] (FLINK-33128) TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on converter

2023-10-26 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao resolved FLINK-33128.
-
Resolution: Fixed

> TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on 
> converter
> -
>
> Key: FLINK-33128
> URL: https://issues.apache.org/jira/browse/FLINK-33128
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Jerome Gagnon
>Assignee: Jerome Gagnon
>Priority: Major
>  Labels: pull-request-available
>
> When using the TestValues connector with nested Row values relying on 
> BinaryArrayWriter the following exception happen : 
> {code:java}
> java.lang.NullPointerException: Cannot invoke 
> "org.apache.flink.table.data.writer.BinaryArrayWriter.getNumElements()" 
> because "this.reuseWriter" is null
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.allocateWriter(ArrayObjectArrayConverter.java:140)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toBinaryArrayData(ArrayObjectArrayConverter.java:114)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:93)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.lambda$indexDataByKey$0(TestValuesRuntimeFunctions.java:626)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.indexDataByKey(TestValuesRuntimeFunctions.java:624)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.open(TestValuesRuntimeFunctions.java:601)
>     at LookupFunction$370.open(Unknown Source)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.open(LookupJoinRunner.java:67)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner.open(LookupJoinWithCalcRunner.java:51)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
>     at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at 

[jira] [Updated] (FLINK-33128) TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on converter

2023-10-26 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-33128:

Affects Version/s: 1.19.0
   (was: 1.16.2)

> TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on 
> converter
> -
>
> Key: FLINK-33128
> URL: https://issues.apache.org/jira/browse/FLINK-33128
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Jerome Gagnon
>Assignee: Jerome Gagnon
>Priority: Major
>  Labels: pull-request-available
>
> When using the TestValues connector with nested Row values relying on 
> BinaryArrayWriter the following exception happen : 
> {code:java}
> java.lang.NullPointerException: Cannot invoke 
> "org.apache.flink.table.data.writer.BinaryArrayWriter.getNumElements()" 
> because "this.reuseWriter" is null
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.allocateWriter(ArrayObjectArrayConverter.java:140)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toBinaryArrayData(ArrayObjectArrayConverter.java:114)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:93)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.lambda$indexDataByKey$0(TestValuesRuntimeFunctions.java:626)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.indexDataByKey(TestValuesRuntimeFunctions.java:624)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.open(TestValuesRuntimeFunctions.java:601)
>     at LookupFunction$370.open(Unknown Source)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.open(LookupJoinRunner.java:67)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner.open(LookupJoinWithCalcRunner.java:51)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
>     at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at 

[jira] [Assigned] (FLINK-33128) TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on converter

2023-10-26 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-33128:
---

Assignee: Jerome Gagnon

> TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on 
> converter
> -
>
> Key: FLINK-33128
> URL: https://issues.apache.org/jira/browse/FLINK-33128
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.2
>Reporter: Jerome Gagnon
>Assignee: Jerome Gagnon
>Priority: Major
>  Labels: pull-request-available
>
> When using the TestValues connector with nested Row values relying on 
> BinaryArrayWriter the following exception happen : 
> {code:java}
> java.lang.NullPointerException: Cannot invoke 
> "org.apache.flink.table.data.writer.BinaryArrayWriter.getNumElements()" 
> because "this.reuseWriter" is null
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.allocateWriter(ArrayObjectArrayConverter.java:140)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toBinaryArrayData(ArrayObjectArrayConverter.java:114)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:93)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.lambda$indexDataByKey$0(TestValuesRuntimeFunctions.java:626)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.indexDataByKey(TestValuesRuntimeFunctions.java:624)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.open(TestValuesRuntimeFunctions.java:601)
>     at LookupFunction$370.open(Unknown Source)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.open(LookupJoinRunner.java:67)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner.open(LookupJoinWithCalcRunner.java:51)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
>     at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at 

[jira] [Commented] (FLINK-33128) TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on converter

2023-10-26 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779959#comment-17779959
 ] 

Yun Gao commented on FLINK-33128:
-

Merged on master via f31770fcf5769052f1ac32a6529de979eaf339a4

> TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on 
> converter
> -
>
> Key: FLINK-33128
> URL: https://issues.apache.org/jira/browse/FLINK-33128
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.2
>Reporter: Jerome Gagnon
>Assignee: Jerome Gagnon
>Priority: Major
>  Labels: pull-request-available
>
> When using the TestValues connector with nested Row values relying on 
> BinaryArrayWriter the following exception happen : 
> {code:java}
> java.lang.NullPointerException: Cannot invoke 
> "org.apache.flink.table.data.writer.BinaryArrayWriter.getNumElements()" 
> because "this.reuseWriter" is null
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.allocateWriter(ArrayObjectArrayConverter.java:140)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toBinaryArrayData(ArrayObjectArrayConverter.java:114)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:93)
>     at 
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
>     at 
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
>     at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>     at 
> org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.lambda$indexDataByKey$0(TestValuesRuntimeFunctions.java:626)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.indexDataByKey(TestValuesRuntimeFunctions.java:624)
>     at 
> org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.open(TestValuesRuntimeFunctions.java:601)
>     at LookupFunction$370.open(Unknown Source)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.open(LookupJoinRunner.java:67)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner.open(LookupJoinWithCalcRunner.java:51)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
>     at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>     at 

[jira] [Commented] (FLINK-29459) Sink v2 has bugs in supporting legacy v1 implementations with global committer

2023-07-23 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746204#comment-17746204
 ] 

Yun Gao commented on FLINK-29459:
-

Hi [~martijnvisser]  Logically it does not involve the change of API, do you 
think we should move the discussion to the thread or we could continue the 
discussion under this issue? Both works from my side. 

> Sink v2 has bugs in supporting legacy v1 implementations with global committer
> --
>
> Key: FLINK-29459
> URL: https://issues.apache.org/jira/browse/FLINK-29459
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.17.0, 1.15.3
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> Currently when supporting Sink implementation using version 1 interface, 
> there are issues after restoring from a checkpoint after failover:
>  # In global committer operator, when restoring SubtaskCommittableManager, 
> the subtask id is replaced with the one in the current operator. This means 
> that the id originally is the id of the sender task (0 ~ N - 1), but after 
> restoring it has to be 0. This would cause Duplication Key exception during 
> restoring.
>  # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl 
> is always restored to 0 after failover for all the subtasks. This makes the 
> summary sent to the Global Committer is attached with wrong subtask id.
>  # For Committer operator, the checkpoint id of SubtaskCommittableManager is 
> always restored to 1 after failover, this make the following committable sent 
> to the global committer is attached with wrong checkpoint id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-14 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743256#comment-17743256
 ] 

Yun Gao commented on FLINK-32455:
-

Hi [~renqs] [~tzulitai] sorry for the late reply, it might takes some time to 
finish the above fix, thus I think it would be indeed preferred to first 
merging the hotfix. I'll try to finish the fix as soon as possible. 

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: kafka-3.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-07-05 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740371#comment-17740371
 ] 

Yun Gao commented on FLINK-30238:
-

Hi [~jingge]  may I have a double confirmation on the detailed issue now? Since 
in the above there seems some divergence on the cause of this issue. 

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
> Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png
>
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-04 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740024#comment-17740024
 ] 

Yun Gao commented on FLINK-32455:
-

Thanks [~tzulitai] for the quick fix! 

For the formal fix, I think we might also use the same option, namely we
 # Revert the `TypeSerializerUpgradeTestBase` so the connectors libraries could 
continue to use it.
 # Introduce a new `MigratedTypeSerializerUpgradeTestBase` and make all the 
tests inside flink library to use.
 # Then after 1.18 get published, we could move 
`MigratedTypeSerializerUpgradeTestBase` back to 
`TypeSerializerUpgradeTestBase`, and also migrates the tests in the connector 
libraries. 

What do you think about this?

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-06-29 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738503#comment-17738503
 ] 

Yun Gao commented on FLINK-32455:
-

May I first have a double confirmation on the direction we'd like to go:



1. As a whole, FLINK-27518  aims to  avoid generating new snapshots for 
migration tests manually on publishing new version. Since there are also 
migration tests in the Kafka repo, I think it is also useful to also introduce 
the improve here. 
2. for the fink-migration-test-utils dependency, now it requires the module 
using migration tests to introduce the dependency directly, thus it should also 
not be a blocker. 

If we think it is ok to go this way, I could open a PR to refactor the existing 
migration tests to the new framework. 

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-06-29 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738502#comment-17738502
 ] 

Yun Gao commented on FLINK-32455:
-

Sorry for missing the external repositories, I'll also have a double check 
here. 

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-27518) Refactor migration tests to support version update automatically

2023-05-12 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-27518.
---
Fix Version/s: 1.18.0
   Resolution: Fixed

> Refactor migration tests to support version update automatically
> 
>
> Key: FLINK-27518
> URL: https://issues.apache.org/jira/browse/FLINK-27518
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Currently on releasing each version, we need to manually generate the 
> snapshots for every migration tests and update the current versions. With 
> more and more migration tests are added, this has been more and more 
> intractable. It is better if we could make it happen automatically on cutting 
> new branches. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27518) Refactor migration tests to support version update automatically

2023-05-12 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722035#comment-17722035
 ] 

Yun Gao commented on FLINK-27518:
-

Merged on master via 82fb74e23b1dfa46fa98c89a58d7f3126aeaec6c. 

> Refactor migration tests to support version update automatically
> 
>
> Key: FLINK-27518
> URL: https://issues.apache.org/jira/browse/FLINK-27518
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>  Labels: pull-request-available
>
> Currently on releasing each version, we need to manually generate the 
> snapshots for every migration tests and update the current versions. With 
> more and more migration tests are added, this has been more and more 
> intractable. It is better if we could make it happen automatically on cutting 
> new branches. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31708) RuntimeException/KryoException thrown when deserializing an empty protobuf record

2023-04-20 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-31708.
---
Resolution: Fixed

> RuntimeException/KryoException thrown when deserializing an empty protobuf 
> record
> -
>
> Key: FLINK-31708
> URL: https://issues.apache.org/jira/browse/FLINK-31708
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.10.0, 1.17.0
>Reporter: shen
>Assignee: shen
>Priority: Major
>  Labels: pull-request-available
>
> h1. Problem description
> I am using protobuf defined Class in Flink job. When the application runs on 
> production, the job throws following Exception:
> {code:java}
> java.lang.RuntimeException: Could not create class com.MYClass < 
> generated by protobuf
> at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
> at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:319)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:494)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
> more bytes left.
> at 
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:127)
> at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
> at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
> ... 16 common frames omitted
>  {code}
> h1. How to reproduce
> I think this is similar to another issue: FLINK-29347.
> Follwing is an example to reproduce the problem:
> {code:java}
> package com.test;
> import com.test.ProtobufGeneratedClass;
> import com.google.protobuf.Message;
> import com.twitter.chill.protobuf.ProtobufSerializer;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.restartstrategy.RestartStrategies;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.api.java.utils.MultipleParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.BroadcastStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.OutputTag;
> import java.util.Random;
> @Slf4j
> public class app {
>   public static final OutputTag OUTPUT_TAG_1 =
>   new OutputTag("output-tag-1") {
>   };
>   public static final OutputTag OUTPUT_TAG_2 =
>   new OutputTag("output-tag-2") {
> 

[jira] [Commented] (FLINK-31708) RuntimeException/KryoException thrown when deserializing an empty protobuf record

2023-04-20 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714437#comment-17714437
 ] 

Yun Gao commented on FLINK-31708:
-

Merged on master via d9e9d1ca741c84a18dbfac0ed44bd38cb5e11f3f

Merged on release-1.17 via 729043df687a96711d3591fcdf5e8e712cd21b87

Merged on release-1.16 via 389389fabd0eeb7894065e42f395db67f3e722e1

> RuntimeException/KryoException thrown when deserializing an empty protobuf 
> record
> -
>
> Key: FLINK-31708
> URL: https://issues.apache.org/jira/browse/FLINK-31708
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.10.0, 1.17.0
>Reporter: shen
>Assignee: shen
>Priority: Major
>  Labels: pull-request-available
>
> h1. Problem description
> I am using protobuf defined Class in Flink job. When the application runs on 
> production, the job throws following Exception:
> {code:java}
> java.lang.RuntimeException: Could not create class com.MYClass < 
> generated by protobuf
> at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
> at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:319)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:494)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
> more bytes left.
> at 
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:127)
> at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
> at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
> ... 16 common frames omitted
>  {code}
> h1. How to reproduce
> I think this is similar to another issue: FLINK-29347.
> Follwing is an example to reproduce the problem:
> {code:java}
> package com.test;
> import com.test.ProtobufGeneratedClass;
> import com.google.protobuf.Message;
> import com.twitter.chill.protobuf.ProtobufSerializer;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.restartstrategy.RestartStrategies;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.api.java.utils.MultipleParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.BroadcastStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.OutputTag;
> import 

[jira] [Assigned] (FLINK-31708) RuntimeException/KryoException thrown when deserializing an empty protobuf record

2023-04-19 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-31708:
---

Assignee: shen

> RuntimeException/KryoException thrown when deserializing an empty protobuf 
> record
> -
>
> Key: FLINK-31708
> URL: https://issues.apache.org/jira/browse/FLINK-31708
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.10.0, 1.17.0
>Reporter: shen
>Assignee: shen
>Priority: Major
>  Labels: pull-request-available
>
> h1. Problem description
> I am using protobuf defined Class in Flink job. When the application runs on 
> production, the job throws following Exception:
> {code:java}
> java.lang.RuntimeException: Could not create class com.MYClass < 
> generated by protobuf
> at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
> at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:319)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:494)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
> more bytes left.
> at 
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:127)
> at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
> at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
> ... 16 common frames omitted
>  {code}
> h1. How to reproduce
> I think this is similar to another issue: FLINK-29347.
> Follwing is an example to reproduce the problem:
> {code:java}
> package com.test;
> import com.test.ProtobufGeneratedClass;
> import com.google.protobuf.Message;
> import com.twitter.chill.protobuf.ProtobufSerializer;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.restartstrategy.RestartStrategies;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.api.java.utils.MultipleParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.BroadcastStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.OutputTag;
> import java.util.Random;
> @Slf4j
> public class app {
>   public static final OutputTag OUTPUT_TAG_1 =
>   new OutputTag("output-tag-1") {
>   };
>   public static final OutputTag OUTPUT_TAG_2 =
>   new 

[jira] [Assigned] (FLINK-31632) watermark aligned idle source can't resume

2023-04-13 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-31632:
---

Assignee: haishui

> watermark aligned idle source can't resume
> --
>
> Key: FLINK-31632
> URL: https://issues.apache.org/jira/browse/FLINK-31632
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0, 1.16.1, 1.15.4
>Reporter: haishui
>Assignee: haishui
>Priority: Critical
>  Labels: pull-request-available
>
>  
> {code:java}
> WatermarkStrategy watermarkStrategy = WatermarkStrategy
> .forBoundedOutOfOrderness(Duration.ofMillis(0))
> .withTimestampAssigner((element, recordTimestamp) -> 
> Long.parseLong(element))
> .withWatermarkAlignment("group", Duration.ofMillis(10), 
> Duration.ofSeconds(2))
> .withIdleness(Duration.ofSeconds(10)); 
> DataStreamSource s1 = env.fromSource(kafkaSource("topic1"), 
> watermarkStrategy, "S1");
> DataStreamSource s2 = env.fromSource(kafkaSource("topic2"), 
> watermarkStrategy, "S2");{code}
> send "0" to kafka topic1 and topic2
>  
> After 10s, source1 and source2 is idle,and logs are
>  
> {code:java}
> 09:44:30,403 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
> 09:44:30,404 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
> 09:44:32,019 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=9 to subTaskIds=[0]
> 09:44:32,019 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=9 to subTaskIds=[0]
> 09:44:32,417 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
> 09:44:32,418 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
> 09:44:34,028 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=9 to subTaskIds=[0]
> 09:44:34,028 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=9 to subTaskIds=[0]
> 09:44:34,423 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 15:12:55.807) from subTaskId=0
> 09:44:34,424 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 15:12:55.807) from subTaskId=0
> 09:44:36,023 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=-9223372036854775799 to subTaskIds=[0]
> 09:44:36,023 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=-9223372036854775799 to subTaskIds=[0]
> 09:44:36,433 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 15:12:55.807) from subTaskId=0
> 09:44:36,433 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 15:12:55.807) from subTaskId=0 {code}
> send message to topic1 or topic2 now, the message can't be consumed。
>  
> the reason is: 
> when a source is marked idle, the lastEmittedWatermark = Long.MAX_VALUE and 
> currentMaxDesiredWatermark = Long.MAX_VALUE + maxAllowedWatermarkDrift in 
> org.apache.flink.streaming.api.operators.SourceOperator.
> currentMaxDesiredWatermark is negative and always less than 
> lastEmittedWatermark
> operatingMode always is WAITING_FOR_ALIGNMENT



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31655) Adaptive Channel selection for partitioner

2023-03-30 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707102#comment-17707102
 ] 

Yun Gao commented on FLINK-31655:
-

Hi [~tartarus] thanks for the proposal, it also looks useful from my side. 

There are also some discussion about it in the community previously , and some 
points are mentioned, like we might not introduce new locks and we might design 
carefully about the structure to maintain the active channels to avoid 
additional overhead, thus I also think if convenient you may propose a design 
doc thus we could first get consensus on the overall design. 

Also cc [~pltbkd] , who has also implemented the functionality previously in 
the internal version. 

> Adaptive Channel selection for partitioner
> --
>
> Key: FLINK-31655
> URL: https://issues.apache.org/jira/browse/FLINK-31655
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: tartarus
>Assignee: tartarus
>Priority: Major
>
> In Flink, if the upstream and downstream operator parallelism is not the 
> same, then by default the RebalancePartitioner will be used to select the 
> target channel.
> In our company, users often use flink to access redis, hbase or other rpc 
> services, If some of the Operators are slow to return requests (for external 
> service reasons), then because Rebalance/Rescale are Round-Robin the Channel 
> selection policy, so the job is easy to backpressure.
> Because the Rebalance/Rescale policy does not care which subtask the data is 
> sent to downstream, so we expect Rebalance/Rescale to refer to the processing 
> power of the downstream subtask when choosing a Channel.
> Send more data to the free subtask, this ensures the best possible throughput 
> of job!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-03-06 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697246#comment-17697246
 ] 

Yun Gao commented on FLINK-30238:
-

Hi [~agg_neha07] I think the change of intermediate savepoint (namely the ones 
not triggered by stop-with-savepoint) comes from the modification to skip 
savepoint for recovery: 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership,]
 which is merged in 1.15.  With this modification the intermediate savepoint 
will only do snapshots and will not commit side-effect. 

May I have a double confirmation about the detailed issue that the change 
caused issues for the delta sink pipeline? 

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-03-06 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697241#comment-17697241
 ] 

Yun Gao commented on FLINK-30238:
-

Hi [~fpaul] sorry for the long delay for it took me some time to do the double 
confirmation:
 # Currently endOfInput() should only be called on there is no new data or 
stop-with-savepoint --drain. For these two cases, the job is logically 
"terminated", namely they should not be restarted again: 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
 # For normal savepoint or stop-with-savepoint without draining, the 
endOfInput() will not be called, and the job is logically "suspended" and could 
be restarted from the savepoint again. 
 # For this specific Kafka test case, I think it should not use 
stopWithSavepoint with advancedTimestamp = true (which is in fact translated to 
stop-with-savepont --drain finally), since it tries to create a savepoint that 
the job could be restarted from. 

 

Thus for the real-world case, if a job has "endOfInput()" called, it is 
expected to be not restarted any more. 

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-02-15 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17689588#comment-17689588
 ] 

Yun Gao commented on FLINK-30238:
-

Hi [~pnowojski] , for this specific issue, based on my understanding, 
 # It looks to me the first issue might refer to the situation that on 
endOfInput(), the Writer will emit the summary message tag with \{checkpoint id 
= MAX_VALUE} to the Committer, followed by the remaining Committables. But with 
the current process, endOfInput() should not be called on stop-with-savepoint 
without draining, thus sorry I'm also not fully understand this part. 
 # For the second issue, it looks to me it refers to how the last piece of the 
committables should be passed to the post-committer topology, as posted in the 
above. Although here it refers to stop-with-savepoint, but it looks to me it 
should be stop-with-savepoint –-drain, since for stop-with-savepoint, the last 
piece of Committables should be still recorded in the last savepoint, and they 
will be re-committed after restarted. 

Perhaps [~fpaul] could give some more details?

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26173) Sink GlobalCommitter's behavior is not compatible

2023-02-09 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686323#comment-17686323
 ] 

Yun Gao commented on FLINK-26173:
-

I'll first close this issue since it seems the PR has been merged. 

> Sink GlobalCommitter's behavior is not compatible
> -
>
> Key: FLINK-26173
> URL: https://issues.apache.org/jira/browse/FLINK-26173
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.0
>Reporter: Jingsong Lee
>Assignee: Fabian Paul
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.15.0
>
>
> GlobalCommitter's behavior is not compatible.
>  * filterRecoveredCommittables is never invoked, Previously it would be 
> called on recovery
>  * GlobalCommT is solidified after producing by GlobalCommitter.combine. 
> Instead of creating every time



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-26173) Sink GlobalCommitter's behavior is not compatible

2023-02-09 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-26173.
---
Fix Version/s: 1.15.0
   Resolution: Fixed

> Sink GlobalCommitter's behavior is not compatible
> -
>
> Key: FLINK-26173
> URL: https://issues.apache.org/jira/browse/FLINK-26173
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.0
>Reporter: Jingsong Lee
>Assignee: Fabian Paul
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.15.0
>
>
> GlobalCommitter's behavior is not compatible.
>  * filterRecoveredCommittables is never invoked, Previously it would be 
> called on recovery
>  * GlobalCommT is solidified after producing by GlobalCommitter.combine. 
> Instead of creating every time



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-26970) Fix comment style error

2023-02-09 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-26970.
---
Resolution: Not A Problem

> Fix comment style error
> ---
>
> Key: FLINK-26970
> URL: https://issues.apache.org/jira/browse/FLINK-26970
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.14.3
>Reporter: codingcaproni
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> Fix comment style error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26970) Fix comment style error

2023-02-09 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686321#comment-17686321
 ] 

Yun Gao commented on FLINK-26970:
-

Hi [~codingcaproni] very thanks for the efforts, based on the above discussion 
I'll first close this issue. 

> Fix comment style error
> ---
>
> Key: FLINK-26970
> URL: https://issues.apache.org/jira/browse/FLINK-26970
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.14.3
>Reporter: codingcaproni
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> Fix comment style error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30448) "filter(Objects::nonNull)" will bring down task with failure cause: ClassCastException

2023-02-09 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686305#comment-17686305
 ] 

Yun Gao commented on FLINK-30448:
-

Very thanks [~pltbkd] for the analyzing! Hi [~float2net] based on the above 
analysis it looks to me that this issue comes from the limitation of using java 
static method as lambda, thus I'll first close this issue. Feel free to reopen 
it if there are more need to be checked. 

> "filter(Objects::nonNull)" will bring down task with failure cause: 
> ClassCastException
> --
>
> Key: FLINK-30448
> URL: https://issues.apache.org/jira/browse/FLINK-30448
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
> Environment: test both on flink 1.15.1 and flink 1.16.0
> Intellij-Idea dev environment run
>Reporter: Yong
>Priority: Major
> Attachments: TestSideOutput.java
>
>
> Attached an *all-in-one* java program, which can run locally in DEV 
> environment(e.g. IntelliJ IDEA->run), consuming from elements stream objects, 
> the object schema is a parent containing two childs(Child1 and Child2) 
> fields, I use *side-output* to map and split out two different sub-streams, 
> each for one child. I put '{*}filter(Objects:nonNUll){*}' for each sub-stream 
> to ignore null objects. When  comming from stream the parent record 
> {*}containing any one of child is null{*}, the program will bring down the 
> task and produce below error:
> ..
> switched from RUNNING to FAILED with failure cause: 
> java.lang.{*}ClassCastException{*}: mytest.TestSideOutput$Child2 cannot be 
> cast to mytest.TestSideOutput$Child1. Failed to push OutputTag with id 
> 'child2' to operator. This can occur when multiple OutputTags with different 
> types but identical names are being used.
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
> ..
>  
> However, if I replace '{*}filter(Objects:nonNull){*}' (at line #71 and #90) 
> with logically equivalent  '{*}filter(x->x!=null){*}‘ (at line #70 and #89), 
> everythink will be OK.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30448) "filter(Objects::nonNull)" will bring down task with failure cause: ClassCastException

2023-02-09 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-30448.
---
Resolution: Won't Fix

> "filter(Objects::nonNull)" will bring down task with failure cause: 
> ClassCastException
> --
>
> Key: FLINK-30448
> URL: https://issues.apache.org/jira/browse/FLINK-30448
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
> Environment: test both on flink 1.15.1 and flink 1.16.0
> Intellij-Idea dev environment run
>Reporter: Yong
>Priority: Major
> Attachments: TestSideOutput.java
>
>
> Attached an *all-in-one* java program, which can run locally in DEV 
> environment(e.g. IntelliJ IDEA->run), consuming from elements stream objects, 
> the object schema is a parent containing two childs(Child1 and Child2) 
> fields, I use *side-output* to map and split out two different sub-streams, 
> each for one child. I put '{*}filter(Objects:nonNUll){*}' for each sub-stream 
> to ignore null objects. When  comming from stream the parent record 
> {*}containing any one of child is null{*}, the program will bring down the 
> task and produce below error:
> ..
> switched from RUNNING to FAILED with failure cause: 
> java.lang.{*}ClassCastException{*}: mytest.TestSideOutput$Child2 cannot be 
> cast to mytest.TestSideOutput$Child1. Failed to push OutputTag with id 
> 'child2' to operator. This can occur when multiple OutputTags with different 
> types but identical names are being used.
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
> ..
>  
> However, if I replace '{*}filter(Objects:nonNull){*}' (at line #71 and #90) 
> with logically equivalent  '{*}filter(x->x!=null){*}‘ (at line #70 and #89), 
> everythink will be OK.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-02-09 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686272#comment-17686272
 ] 

Yun Gao commented on FLINK-30238:
-

Hi, I might first complement some background before continuing discussion:

Currently the sink topology could be simplified as  writer -> committer -> 
post-committer topology, and the rough process of two-phase commit is
 # Writer writes to temporary transaction or intermediate file. 
 # Writer emits Committables, which are the handles of the transactions or 
intermediate files, to the Committer on prepareSnapshotPreBarrier. 
 # Committer records these Committables on snapshotting. 
 # Committer commits these Committables on notifyCheckpointComplete. 
 # Committer emits the Committed Committables to the post-committer topology

If the job is bounded, then for the last piece of data, the process will be 
slightly different, based on the current implementation:
 # Writer received END_OF_DATA message and emits current pending Committables. 
Logically it should not emits new record since now, and also will start waiting 
for the final checkpoint. 
 # Committer then also received END_OF_DATA and waiting for the final 
checkpoint. 
 # Committer records the Committables on final checkpoint snapshotting. 
 # Committer commits these Committables on the final checkpoint 
notifyCheckpointComplete. Since it has already received END_OF_DATA, it could 
not emits these Committables to the post-committer topology at this time point. 

Thus It looks to me that the root issue is how we deal with the last batch of 
Committables. The original thought for this issue is some method like:
 # For Committer, it emit the un-committed Committables to the post-committer 
topology, with special tags. 
 # The post-commit topology might use connector specialized method to check if 
these Committables are committed, then wait until all the Committables to 
commit and do the cleanup in the final checkpoint notifyCheckpointComplete. 

What do you think about that ?

 

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30906) TwoInputStreamTask passes wrong configuration object when creating input processor

2023-02-05 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-30906:

Component/s: Runtime / Task

> TwoInputStreamTask passes wrong configuration object when creating input 
> processor
> --
>
> Key: FLINK-30906
> URL: https://issues.apache.org/jira/browse/FLINK-30906
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Yun Gao
>Priority: Major
>
> It seems _StreamTwoInputProcessorFactory.create_ is passed with wrong 
> configuration object: the taskManagerConfiguration should be __ 
> _getEnvironment().getTaskManagerInfo().getConfiguration()._ 
>  
> And in the following logic, it seems to indeed try to load taskmanager 
> options from this configuration object, like state-backend and 
> taskmanager.memory.managed.consumer-weights 
>  
> [1]https://github.com/apache/flink/blob/111342f37bdc0d582d3f7af458d9869f0548299f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java#L98



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30906) TwoInputStreamTask passes wrong configuration object when creating input processor

2023-02-05 Thread Yun Gao (Jira)
Yun Gao created FLINK-30906:
---

 Summary: TwoInputStreamTask passes wrong configuration object when 
creating input processor
 Key: FLINK-30906
 URL: https://issues.apache.org/jira/browse/FLINK-30906
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.1, 1.17.0
Reporter: Yun Gao


It seems _StreamTwoInputProcessorFactory.create_ is passed with wrong 
configuration object: the taskManagerConfiguration should be __ 
_getEnvironment().getTaskManagerInfo().getConfiguration()._ 
 
And in the following logic, it seems to indeed try to load taskmanager options 
from this configuration object, like state-backend and 
taskmanager.memory.managed.consumer-weights 
 

[1]https://github.com/apache/flink/blob/111342f37bdc0d582d3f7af458d9869f0548299f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java#L98



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30505) Close the connection between TM and JM when task executor failed

2023-02-02 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-30505.
---
Resolution: Won't Fix

> Close the connection between TM and JM when task executor failed
> 
>
> Key: FLINK-30505
> URL: https://issues.apache.org/jira/browse/FLINK-30505
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.16.0
>Reporter: Yongming Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> When resource manager detects a task executor has failed, it will close 
> connection with task executor. At this time,jobs running on this tm will fail 
> for other reasons(no longger reachable or heartbeat timeout).
> !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672047809511-a4b8b5d9-f11f-483c-a113-b42290a33250.png|width=1160,id=uc24b1166!
> If close the connection between task executor and job master when resource 
> manager detects a task executor has failed,the real reason for task executor 
> failure will appear in "Root Exception".This will make it easier for users to 
> find problems.
> !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672048733572-2b5b7be4-087d-46ae-9c8d-6ad5a1344019.png|width=1141,id=u947d8c4e!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30505) Close the connection between TM and JM when task executor failed

2023-02-02 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17683269#comment-17683269
 ] 

Yun Gao commented on FLINK-30505:
-

Thanks [~damumu] and [~xtsong] for the discussion! Now as it seems we have 
already reached consistency, I'll first close this issue. If there are future 
issues let's open the issue again.

> Close the connection between TM and JM when task executor failed
> 
>
> Key: FLINK-30505
> URL: https://issues.apache.org/jira/browse/FLINK-30505
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.16.0
>Reporter: Yongming Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> When resource manager detects a task executor has failed, it will close 
> connection with task executor. At this time,jobs running on this tm will fail 
> for other reasons(no longger reachable or heartbeat timeout).
> !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672047809511-a4b8b5d9-f11f-483c-a113-b42290a33250.png|width=1160,id=uc24b1166!
> If close the connection between task executor and job master when resource 
> manager detects a task executor has failed,the real reason for task executor 
> failure will appear in "Root Exception".This will make it easier for users to 
> find problems.
> !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672048733572-2b5b7be4-087d-46ae-9c8d-6ad5a1344019.png|width=1141,id=u947d8c4e!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30877) Add support for @Before / @After for JUnit 4 and 5

2023-02-02 Thread Yun Gao (Jira)
Yun Gao created FLINK-30877:
---

 Summary: Add support for @Before / @After for JUnit 4 and 5
 Key: FLINK-30877
 URL: https://issues.apache.org/jira/browse/FLINK-30877
 Project: Flink
  Issue Type: Sub-task
  Components: Test Infrastructure
Reporter: Yun Gao


Based on the new migration tests infrastructure, we might also support @Before 
/ @After for both junit 4 and 5. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27518) Refactor migration tests to support version update automatically

2023-01-18 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678182#comment-17678182
 ] 

Yun Gao commented on FLINK-27518:
-

Sorry for it takes more time than I thought to finish the PR, and I'll open the 
PR later today. 

> Refactor migration tests to support version update automatically
> 
>
> Key: FLINK-27518
> URL: https://issues.apache.org/jira/browse/FLINK-27518
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> Currently on releasing each version, we need to manually generate the 
> snapshots for every migration tests and update the current versions. With 
> more and more migration tests are added, this has been more and more 
> intractable. It is better if we could make it happen automatically on cutting 
> new branches. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27518) Refactor migration tests to support version update automatically

2023-01-02 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653822#comment-17653822
 ] 

Yun Gao commented on FLINK-27518:
-

Got that, I'm now a bit freed and will improve the priority of this issue. I'll 
open the PR before early next week. 

> Refactor migration tests to support version update automatically
> 
>
> Key: FLINK-27518
> URL: https://issues.apache.org/jira/browse/FLINK-27518
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> Currently on releasing each version, we need to manually generate the 
> snapshots for every migration tests and update the current versions. With 
> more and more migration tests are added, this has been more and more 
> intractable. It is better if we could make it happen automatically on cutting 
> new branches. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30468) The SortOrder of BusyRatio should be descend by default

2022-12-29 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-30468:
---

Assignee: Rui Fan

> The SortOrder of BusyRatio should be descend by default
> ---
>
> Key: FLINK-30468
> URL: https://issues.apache.org/jira/browse/FLINK-30468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the sort order is ascend by default, it should be descend.
> The most busy subtask should be displayed on top.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30412) create many checkpoint empty dir when job not enable checkpoint

2022-12-29 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652768#comment-17652768
 ] 

Yun Gao commented on FLINK-30412:
-

Hi [~xiaodao] sorry for the late reply, may I have a double check on the 
version you are using? Since From the stack, it seems it is still trying to 
create a JobManagerRunner object, but JobManagerRunner has been changed to be 
an interface since 1.10: https://issues.apache.org/jira/browse/FLINK-14259 
Since https://issues.apache.org/jira/browse/FLINK-23180 is merged in 1.15, if 
you are using a version earlier than 1.15.0 I reckon it might be fixed by 
upgrading. 

> create many checkpoint empty dir when job not enable checkpoint
> ---
>
> Key: FLINK-30412
> URL: https://issues.apache.org/jira/browse/FLINK-30412
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.12.7, 1.13.6, 1.15.2
>Reporter: xiaodao
>Priority: Major
> Attachments: image-2022-12-23-09-24-58-584.png
>
>
> when we submit job to flink session cluster , after a long time, we find it 
> create too much
> empty checkpoint dir,and it over hdfs max node limit ;
> i found StreamingJobGraphGenerator set snapshot whennever the job is open 
> checkpoint;
> jobGraph.setSnapshotSettings(settings) 
> {code:java}
> private void configureCheckpointing() 
> CheckpointConfig cfg = streamGraph.getCheckpointConfig(); long interval = 
> cfg.getCheckpointInterval(); if (interval < MINIMAL_CHECKPOINT_TIME) { // 
> interval of max value means disable periodic checkpoint interval = 
> Long.MAX_VALUE; }
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30412) create many checkpoint empty dir when job not enable checkpoint

2022-12-22 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651167#comment-17651167
 ] 

Yun Gao commented on FLINK-30412:
-

Hi [~xtsong] [~xiaodao]  I have a double check on the issue, the logic seems to 
be:
 # The `ExecutionGraphBuilder` seems to judge if it will create 
CheckpointCoordinator based on `snapshotSettings == null`. There is one point 
that seems a bit unexpected is that it seems in DataStream snapshotSettings is 
always set, which means we will also create the CheckpointCoordinator in batch 
mode. This might need some double checks.
 # For streaming mode, it should be reasonable since even if user disable the 
periodic checkpoints, they may still take savepoints, thus we still need to 
create the CheckpointCoordinator.

But I'm still not get why the directory is leaked, since it seems after fixed 
in https://issues.apache.org/jira/browse/FLINK-23180, the directory should not 
be created if the checkpoint interval == Long.MAX_VALUE : 
[https://github.com/apache/flink/blob/4ea67f63eb1c43d7adf07c37946b20b525fb015d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L335.]
   [~xiaodao] could you also attach the paths of the leaked directories ? 

> create many checkpoint empty dir when job not enable checkpoint
> ---
>
> Key: FLINK-30412
> URL: https://issues.apache.org/jira/browse/FLINK-30412
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.12.7, 1.13.6, 1.15.2
>Reporter: xiaodao
>Priority: Major
>
> when we submit job to flink session cluster , after a long time, we find it 
> create too much
> empty checkpoint dir,and it over hdfs max node limit ;
> i found StreamingJobGraphGenerator set snapshot whennever the job is open 
> checkpoint;
> jobGraph.setSnapshotSettings(settings) 
> {code:java}
> private void configureCheckpointing() 
> CheckpointConfig cfg = streamGraph.getCheckpointConfig(); long interval = 
> cfg.getCheckpointInterval(); if (interval < MINIMAL_CHECKPOINT_TIME) { // 
> interval of max value means disable periodic checkpoint interval = 
> Long.MAX_VALUE; }
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30131) flink iterate will suspend when record is a bit large

2022-12-15 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17647939#comment-17647939
 ] 

Yun Gao commented on FLINK-30131:
-

Hi [~landlord] do you still have problem on this issue ? 

> flink iterate will suspend when record is a bit large
> -
>
> Key: FLINK-30131
> URL: https://issues.apache.org/jira/browse/FLINK-30131
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.2
>Reporter: Lu
>Priority: Major
> Attachments: image-2022-11-22-14-59-08-272.png, 
> image-2022-11-24-17-10-45-651.png, image-2022-11-24-17-12-02-129.png, 
> image-2022-11-24-17-12-47-024.png
>
>
>  
> {code:java}
> //代码占位符
> Configuration configuration = new Configuration();
> configuration.setInteger(RestOptions.PORT, 8082);
> configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 1000);
> configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, 
> MemorySize.parse("4g"));
> configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 
> 1000);
> configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate",
>  1000);
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> env.setParallelism(1);
> List list = new ArrayList<>(10);
> for (int i = 1; i < 1; i++) {
> list.add(i);
> }
> DataStreamSource integerDataStreamSource = env.fromCollection(list);
> DataStream map = integerDataStreamSource.map(i -> new 
> byte[1000]).setParallelism(1).name("map to byte[]").shuffle();
> IterativeStream iterate = map.iterate();
> DataStream map1 = iterate.process(new ProcessFunction byte[]>() {
> @Override
> public void processElement(byte[] value, ProcessFunction byte[]>.Context ctx, Collector out) throws Exception {
> out.collect(value);
> }
> }).name("multi collect");
> DataStream filter = map1.filter(i -> true 
> ).setParallelism(1).name("feedback");
> iterate.closeWith(filter);
> map1.map(bytes -> bytes.length).name("map to length").print();
> env.execute(); {code}
> my code is above.
>  
> when i use iterate with big record ,  the iterate will suspend at a random 
> place. when i saw the stack, it has a suspicious thread
> !image-2022-11-22-14-59-08-272.png|width=751,height=328!
> it seems like a network related problem. so i increse the network buffer 
> memory and num. but it only delay the suspend point,  it will still suspend 
> after iterate a little more times than before.
> i want to know if this is a bug or i have some error in my code or 
> configuration.
> looking forward to your reply. thanks in advance.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30403) The reported latest completed checkpoint is discarded

2022-12-15 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17647937#comment-17647937
 ] 

Yun Gao commented on FLINK-30403:
-

Hi [~tisonet] , it looks to me we have to keep the current order since 
_addCompletedCheckpointToStoreAndSubsumeOldest_ still be __ possible get 
failed, in this case the method will throw exception and 
_reportCompletedCheckpoint_ will be skipped. 

Also it looks to me there seems no explicit issues caused by this window? 

> The reported latest completed checkpoint is discarded
> -
>
> Key: FLINK-30403
> URL: https://issues.apache.org/jira/browse/FLINK-30403
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Zdenek Tison
>Priority: Major
>
> There is a small window where the reported latest completed checkpoint can be 
> marked as discarded while the new checkpoint wasn't reported yet. 
> The reason is that the function 
> _addCompletedCheckpointToStoreAndSubsumeOldest_  is called before 
> _reportCompletedCheckpoint_ in _CheckpointCoordinator._
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30424) Add source operator restore readerState log to distinguish split is from newPartitions or split state

2022-12-15 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17647920#comment-17647920
 ] 

Yun Gao commented on FLINK-30424:
-

Hi [~lemonjing] could you pointed out to me the position of 
'assignedPartitions' or the full content of this log?

> Add source operator restore readerState log to distinguish split is from 
> newPartitions or split state
> -
>
> Key: FLINK-30424
> URL: https://issues.apache.org/jira/browse/FLINK-30424
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.15.3, 1.16.1
>Reporter: Ran Tao
>Assignee: Ran Tao
>Priority: Major
>  Labels: pull-request-available
>
> When a job start firstly, we can find 'assignPartitions' from log。but if 
> source recover from state, we can not distinguish the newPartitions is from 
> timed discover thread or from reader task state.  
> We can add a helper log to distinguish and confirm the reader using split 
> state in recover situation.  it's very useful for troubleshooting.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30251) Move the IO with DFS during abort checkpoint to an asynchronous thread.

2022-12-08 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644676#comment-17644676
 ] 

Yun Gao commented on FLINK-30251:
-

[~Ming Li] Hi sorry I might also need to complement one thing: as an 
alternative option, could you also have a double check if it is possible to 
limit the maximum of time to wait for closing to finish? 

> Move the IO with DFS during abort checkpoint to an asynchronous thread.
> ---
>
> Key: FLINK-30251
> URL: https://issues.apache.org/jira/browse/FLINK-30251
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0, 1.15.2
>Reporter: ming li
>Priority: Major
> Attachments: image-2022-11-30-19-10-51-226.png
>
>
> Currently when the {{checkpoint}} fails, we process the abort message in the 
> Task's {{{}mailbox{}}}. We will close the output stream and delete the file 
> on DFS. 
>  
> However, when the {{checkpoint}} failure is caused by a DFS system failure 
> (for example, the namenode failure of HDFS), this operation may take a long 
> time or hang, and the task will not be able to process the data at this time.
>  
> So I think we can put the operation of deleting files in an asynchronous 
> thread just like uploading checkpoint data asynchronously.
> !image-2022-11-30-19-10-51-226.png|width=731,height=347!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30251) Move the IO with DFS during abort checkpoint to an asynchronous thread.

2022-12-08 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644674#comment-17644674
 ] 

Yun Gao edited comment on FLINK-30251 at 12/8/22 8:55 AM:
--

Hi [~Ming Li] very thanks for reporting the issue and thanks [~Yanfei Lei] for 
tracking the issue! I also think this is indeed an issue and moving it to an 
asynchronous thread should be a reasonable solution. [~Ming Li]  Could you 
first open a formal PR?

There is also one concern:
 # The current `asyncOperationsThreadPool` is a cached thread pool, which do 
not have an upper limit of the number of threads, and it will create a new 
thread whenever there is not free thread when submitting tasks. Then if we have 
a large number of file to close, we might end up with a lot of threads, which 
might further cause a large number of memory consumption (1MB for each thread 
RSS region).
 # Thus we might change it to a thread pool with a limited maximum number of 
thread and one unbounded Blocking Queue. Also since the thread in this pool 
might be blocked, we might need to use a separate thread pool.

 


was (Author: gaoyunhaii):
Hi [~Ming Li] very thanks for reporting the issue and thanks [~Yanfei Lei] for 
tracking the issue! I also think this is indeed an issue and moving it to an 
asynchronous thread should be a reasonable solution. [~Ming Li]  Could you 
first open a formal PR?



There is also one concern:
 # The current `asyncOperationsThreadPool` is a cached thread pool, which do 
not have an upper limit of the number of threads, and it will create a new 
thread whenever there is not free thread when submitting tasks. Then if we have 
a large number of file to close, we might end up with a lot of threads, which 
might further cause a large number of memory consumption (1MB for each thread 
RSS region). We might change it to a thread pool with a limited maximum number 
of thread and one unbounded Blocking Queue. 

> Move the IO with DFS during abort checkpoint to an asynchronous thread.
> ---
>
> Key: FLINK-30251
> URL: https://issues.apache.org/jira/browse/FLINK-30251
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0, 1.15.2
>Reporter: ming li
>Priority: Major
> Attachments: image-2022-11-30-19-10-51-226.png
>
>
> Currently when the {{checkpoint}} fails, we process the abort message in the 
> Task's {{{}mailbox{}}}. We will close the output stream and delete the file 
> on DFS. 
>  
> However, when the {{checkpoint}} failure is caused by a DFS system failure 
> (for example, the namenode failure of HDFS), this operation may take a long 
> time or hang, and the task will not be able to process the data at this time.
>  
> So I think we can put the operation of deleting files in an asynchronous 
> thread just like uploading checkpoint data asynchronously.
> !image-2022-11-30-19-10-51-226.png|width=731,height=347!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30251) Move the IO with DFS during abort checkpoint to an asynchronous thread.

2022-12-08 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644674#comment-17644674
 ] 

Yun Gao commented on FLINK-30251:
-

Hi [~Ming Li] very thanks for reporting the issue and thanks [~Yanfei Lei] for 
tracking the issue! I also think this is indeed an issue and moving it to an 
asynchronous thread should be a reasonable solution. [~Ming Li]  Could you 
first open a formal PR?



There is also one concern:
 # The current `asyncOperationsThreadPool` is a cached thread pool, which do 
not have an upper limit of the number of threads, and it will create a new 
thread whenever there is not free thread when submitting tasks. Then if we have 
a large number of file to close, we might end up with a lot of threads, which 
might further cause a large number of memory consumption (1MB for each thread 
RSS region). We might change it to a thread pool with a limited maximum number 
of thread and one unbounded Blocking Queue. 

> Move the IO with DFS during abort checkpoint to an asynchronous thread.
> ---
>
> Key: FLINK-30251
> URL: https://issues.apache.org/jira/browse/FLINK-30251
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0, 1.15.2
>Reporter: ming li
>Priority: Major
> Attachments: image-2022-11-30-19-10-51-226.png
>
>
> Currently when the {{checkpoint}} fails, we process the abort message in the 
> Task's {{{}mailbox{}}}. We will close the output stream and delete the file 
> on DFS. 
>  
> However, when the {{checkpoint}} failure is caused by a DFS system failure 
> (for example, the namenode failure of HDFS), this operation may take a long 
> time or hang, and the task will not be able to process the data at this time.
>  
> So I think we can put the operation of deleting files in an asynchronous 
> thread just like uploading checkpoint data asynchronously.
> !image-2022-11-30-19-10-51-226.png|width=731,height=347!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-27341) TaskManager running together with JobManager are bind to 127.0.0.1

2022-12-01 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-27341.
---
Resolution: Fixed

> TaskManager running together with JobManager are bind to 127.0.0.1
> --
>
> Key: FLINK-27341
> URL: https://issues.apache.org/jira/browse/FLINK-27341
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
>
> If some TaskManagers running with JobManager on the same machine while some 
> other TaskManager not, the TaskManagers running together with JobManager 
> would bind to localhost or 127.0.01, which makes the Netty connections across 
> the TaskManagers fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27341) TaskManager running together with JobManager are bind to 127.0.0.1

2022-12-01 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642246#comment-17642246
 ] 

Yun Gao commented on FLINK-27341:
-

Merged on master via fc5e8bacef34119579defca6256476482da523f9

Merged on 1.15 via 84e8806af16f81a4295ca1f9b0c711c210884b1d

Merged on 1.16 via 8a47420adfe9af7f1c303874c7a32dab3229ea66

> TaskManager running together with JobManager are bind to 127.0.0.1
> --
>
> Key: FLINK-27341
> URL: https://issues.apache.org/jira/browse/FLINK-27341
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
>
> If some TaskManagers running with JobManager on the same machine while some 
> other TaskManager not, the TaskManagers running together with JobManager 
> would bind to localhost or 127.0.01, which makes the Netty connections across 
> the TaskManagers fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27341) TaskManager running together with JobManager are bind to 127.0.0.1

2022-12-01 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-27341:

Fix Version/s: 1.17.0
   1.16.1
   1.15.4

> TaskManager running together with JobManager are bind to 127.0.0.1
> --
>
> Key: FLINK-27341
> URL: https://issues.apache.org/jira/browse/FLINK-27341
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
>
> If some TaskManagers running with JobManager on the same machine while some 
> other TaskManager not, the TaskManagers running together with JobManager 
> would bind to localhost or 127.0.01, which makes the Netty connections across 
> the TaskManagers fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30131) flink iterate will suspend when record is a bit large

2022-12-01 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641760#comment-17641760
 ] 

Yun Gao commented on FLINK-30131:
-

Hi [~landlord] from the attached image, it seems the iteration sink now has a 
busy = 100%, which seems to back-pressure previous tasks. Could you also have a 
check if the sink is a bottleneck?

> flink iterate will suspend when record is a bit large
> -
>
> Key: FLINK-30131
> URL: https://issues.apache.org/jira/browse/FLINK-30131
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.2
>Reporter: Lu
>Priority: Major
> Attachments: image-2022-11-22-14-59-08-272.png, 
> image-2022-11-24-17-10-45-651.png, image-2022-11-24-17-12-02-129.png, 
> image-2022-11-24-17-12-47-024.png
>
>
>  
> {code:java}
> //代码占位符
> Configuration configuration = new Configuration();
> configuration.setInteger(RestOptions.PORT, 8082);
> configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 1000);
> configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, 
> MemorySize.parse("4g"));
> configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 
> 1000);
> configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate",
>  1000);
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> env.setParallelism(1);
> List list = new ArrayList<>(10);
> for (int i = 1; i < 1; i++) {
> list.add(i);
> }
> DataStreamSource integerDataStreamSource = env.fromCollection(list);
> DataStream map = integerDataStreamSource.map(i -> new 
> byte[1000]).setParallelism(1).name("map to byte[]").shuffle();
> IterativeStream iterate = map.iterate();
> DataStream map1 = iterate.process(new ProcessFunction byte[]>() {
> @Override
> public void processElement(byte[] value, ProcessFunction byte[]>.Context ctx, Collector out) throws Exception {
> out.collect(value);
> }
> }).name("multi collect");
> DataStream filter = map1.filter(i -> true 
> ).setParallelism(1).name("feedback");
> iterate.closeWith(filter);
> map1.map(bytes -> bytes.length).name("map to length").print();
> env.execute(); {code}
> my code is above.
>  
> when i use iterate with big record ,  the iterate will suspend at a random 
> place. when i saw the stack, it has a suspicious thread
> !image-2022-11-22-14-59-08-272.png|width=751,height=328!
> it seems like a network related problem. so i increse the network buffer 
> memory and num. but it only delay the suspend point,  it will still suspend 
> after iterate a little more times than before.
> i want to know if this is a bug or i have some error in my code or 
> configuration.
> looking forward to your reply. thanks in advance.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30081) Local executor can not accept different jvm-overhead.min/max values

2022-12-01 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641751#comment-17641751
 ] 

Yun Gao commented on FLINK-30081:
-

Hi [~liuml07]  the mini-cluster does not start new processes, it just executes 
different components in different threads. Thus I think it do not have chance 
to change memory settings, and you might increase the memory of the tests on 
startup directly. 

> Local executor can not accept different jvm-overhead.min/max values
> ---
>
> Key: FLINK-30081
> URL: https://issues.apache.org/jira/browse/FLINK-30081
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.16.0
>Reporter: Mingliang Liu
>Priority: Major
>
> In local executor, it's not possible to set different values for 
> {{taskmanager.memory.jvm-overhead.max}} and 
> {{{}taskmanager.memory.jvm-overhead.min{}}}. The same problem for 
> {{taskmanager.memory.network.max}} and {{{}taskmanager.memory.network.min{}}}.
> Sample code to reproduce:
> {code:java}
> Configuration conf = new Configuration();
> conf.setString(TaskManagerOptions.JVM_OVERHEAD_MIN.key(), "1GB");
> conf.setString(TaskManagerOptions.JVM_OVERHEAD_MAX.key(), "2GB");
> StreamExecutionEnvironment.createLocalEnvironment(conf)
> .fromElements("Hello", "World")
> .executeAndCollect()
> .forEachRemaining(System.out::println); {code}
> The failing exception is something like:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException
>   at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.calculateTotalProcessMemoryFromComponents(TaskExecutorResourceUtils.java:182)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration.create(TaskExecutorMemoryConfiguration.java:119)
> {code}
> I think the problem was that we expect the max and min to equal, but local 
> executor did not reset them correctly?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27518) Refactor migration tests to support version update automatically

2022-11-29 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641079#comment-17641079
 ] 

Yun Gao commented on FLINK-27518:
-

Hi [~mapohl] sorry I'm a bit bound in recent weeks and I'll try to finish this 
issue as soon as possible. 

> Refactor migration tests to support version update automatically
> 
>
> Key: FLINK-27518
> URL: https://issues.apache.org/jira/browse/FLINK-27518
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> Currently on releasing each version, we need to manually generate the 
> snapshots for every migration tests and update the current versions. With 
> more and more migration tests are added, this has been more and more 
> intractable. It is better if we could make it happen automatically on cutting 
> new branches. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28973) Extending /jars/:jarid/plan API to support setting Flink configs

2022-11-24 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638208#comment-17638208
 ] 

Yun Gao commented on FLINK-28973:
-

Very thanks [~ConradJam] for the update! [~Zhanghao Chen] I'll then first close 
this issue, If there are other issues let's reopen it. 

> Extending /jars/:jarid/plan API to support setting Flink configs
> 
>
> Key: FLINK-28973
> URL: https://issues.apache.org/jira/browse/FLINK-28973
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Zhanghao Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30184) Save TM/JM thread stack periodically

2022-11-24 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638198#comment-17638198
 ] 

Yun Gao commented on FLINK-30184:
-

Thanks [~fanrui] for the explanation! Now I got the issue. 

> Save TM/JM thread stack periodically
> 
>
> Key: FLINK-30184
> URL: https://issues.apache.org/jira/browse/FLINK-30184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
>
> After FLINK-14816 FLINK-25398 and FLINK-25372 , flink user can view the 
> thread stack of TM/JM in Flink WebUI. 
> It can help flink users to find out why the Flink job is stuck, or why the 
> processing is slow. It is very useful for trouble shooting.
> However, sometimes Flink tasks get stuck or process slowly, but when the user 
> troubleshoots the problem, the job has resumed. It is difficult to find out 
> what happened to the Flink job at the time and why is it slow?
>  
> So, could we periodically save the thread stack of TM or JM in the TM log 
> directory?
> Define some configurations:
> cluster.thread-dump.interval=1min
> cluster.thread-dump.cleanup-time=48 hours



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint

2022-11-24 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638190#comment-17638190
 ] 

Yun Gao commented on FLINK-29856:
-

Hi [~mason6345]  may I have a double confirmation that how you verified that 
snapshotState/notifyCheckpointComplete is not called?

> Triggering savepoint does not trigger source operator checkpoint 
> -
>
> Key: FLINK-29856
> URL: https://issues.apache.org/jira/browse/FLINK-29856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Mason Chen
>Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two 
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState 
> or notifyCheckpointComplete. This is easily reproducible in a simple pipeline 
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and 
> successful, which is verified by the Flink Checkpoint UI tab and the 
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', 
> postCheckpointAction=NONE, formatType=CANONICAL})`
>  
> However, when the checkpoint occurs via the interval, I do see the sources 
> checkpointing properly and expected logs in the output. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30184) Save TM/JM thread stack periodically

2022-11-24 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638166#comment-17638166
 ] 

Yun Gao commented on FLINK-30184:
-

Hi [~fanrui] perhaps [FlameGraph | 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/]
 could provide the functionality? 

 

> Save TM/JM thread stack periodically
> 
>
> Key: FLINK-30184
> URL: https://issues.apache.org/jira/browse/FLINK-30184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
>
> After FLINK-14816 FLINK-25398 and FLINK-25372 , flink user can view the 
> thread stack of TM/JM in Flink WebUI. 
> It can help flink users to find out why the Flink job is stuck, or why the 
> processing is slow. It is very useful for trouble shooting.
> However, sometimes Flink tasks get stuck or process slowly, but when the user 
> troubleshoots the problem, the job has resumed. It is difficult to find out 
> what happened to the Flink job at the time and why is it slow?
>  
> So, could we periodically save the thread stack of TM or JM in the TM log 
> directory?
> Define some configurations:
> cluster.thread-dump.interval=1min
> cluster.thread-dump.cleanup-time=48 hours



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30131) flink iterate will suspend when record is a bit large

2022-11-24 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638164#comment-17638164
 ] 

Yun Gao commented on FLINK-30131:
-

Hi [~landlord]  sorry for the later response, first may I have a double 
confirmation that have you check the memory usage of the process?  Sine 
currently for the backward edge, the legacy iteration is using an in-memory 
ArrayDeque for that. Perhaps the processing is stuck for doing GC ?

> flink iterate will suspend when record is a bit large
> -
>
> Key: FLINK-30131
> URL: https://issues.apache.org/jira/browse/FLINK-30131
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.2
>Reporter: Lu
>Priority: Major
> Attachments: image-2022-11-22-14-59-08-272.png
>
>
>  
> {code:java}
> //代码占位符
> Configuration configuration = new Configuration();
> configuration.setInteger(RestOptions.PORT, 8082);
> configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 1000);
> configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, 
> MemorySize.parse("4g"));
> configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 
> 1000);
> configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate",
>  1000);
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> env.setParallelism(1);
> List list = new ArrayList<>(10);
> for (int i = 1; i < 1; i++) {
> list.add(i);
> }
> DataStreamSource integerDataStreamSource = env.fromCollection(list);
> DataStream map = integerDataStreamSource.map(i -> new 
> byte[1000]).setParallelism(1).name("map to byte[]").shuffle();
> IterativeStream iterate = map.iterate();
> DataStream map1 = iterate.process(new ProcessFunction byte[]>() {
> @Override
> public void processElement(byte[] value, ProcessFunction byte[]>.Context ctx, Collector out) throws Exception {
> out.collect(value);
> }
> }).name("multi collect");
> DataStream filter = map1.filter(i -> true 
> ).setParallelism(1).name("feedback");
> iterate.closeWith(filter);
> map1.map(bytes -> bytes.length).name("map to length").print();
> env.execute(); {code}
> my code is above.
>  
> when i use iterate with big record ,  the iterate will suspend at a random 
> place. when i saw the stack, it has a suspicious thread
> !image-2022-11-22-14-59-08-272.png|width=751,height=328!
> it seems like a network related problem. so i increse the network buffer 
> memory and num. but it only delay the suspend point,  it will still suspend 
> after iterate a little more times than before.
> i want to know if this is a bug or i have some error in my code or 
> configuration.
> looking forward to your reply. thanks in advance.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-16582) NettyBufferPoolTest may have warns on NettyBuffer leak

2022-11-22 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637079#comment-17637079
 ] 

Yun Gao commented on FLINK-16582:
-

Merged on master via 88a161101bb33b4c088325788bd11d41f9369355

Merged on release-1.16 via  d8ff3216101fd31dbcdc4a725d2e7ead4113

Merged on release-1.15 via 28a877b8880e271123ce674e8669dfe6c6954f1f

 

> NettyBufferPoolTest may have warns on NettyBuffer leak 
> ---
>
> Key: FLINK-16582
> URL: https://issues.apache.org/jira/browse/FLINK-16582
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.15.3
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
>
> {code:java}
> 4749 [Flink Netty Client (50072) Thread 0] ERROR
> org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector [] - LEAK:
> ByteBuf.release() was not called before it's garbage-collected. See
> https://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 
> Created at:
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
>   
> org.apache.flink.runtime.io.network.netty.NettyBufferPoolTest.testNoHeapAllocations(NettyBufferPoolTest.java:38)
>   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   java.lang.reflect.Method.invoke(Method.java:498)
>   
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   org.junit.runners.Suite.runChild(Suite.java:128)
>   org.junit.runners.Suite.runChild(Suite.java:27)
>   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Test ignored.
> Process finished with exit code 0
> {code}
> We should released the allocated buffers in the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-16582) NettyBufferPoolTest may have warns on NettyBuffer leak

2022-11-22 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-16582.
---
Resolution: Fixed

> NettyBufferPoolTest may have warns on NettyBuffer leak 
> ---
>
> Key: FLINK-16582
> URL: https://issues.apache.org/jira/browse/FLINK-16582
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.15.3
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
>
> {code:java}
> 4749 [Flink Netty Client (50072) Thread 0] ERROR
> org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector [] - LEAK:
> ByteBuf.release() was not called before it's garbage-collected. See
> https://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 
> Created at:
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
>   
> org.apache.flink.runtime.io.network.netty.NettyBufferPoolTest.testNoHeapAllocations(NettyBufferPoolTest.java:38)
>   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   java.lang.reflect.Method.invoke(Method.java:498)
>   
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   org.junit.runners.Suite.runChild(Suite.java:128)
>   org.junit.runners.Suite.runChild(Suite.java:27)
>   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Test ignored.
> Process finished with exit code 0
> {code}
> We should released the allocated buffers in the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-16582) NettyBufferPoolTest may have warns on NettyBuffer leak

2022-11-22 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-16582:
---

Assignee: Yun Gao

> NettyBufferPoolTest may have warns on NettyBuffer leak 
> ---
>
> Key: FLINK-16582
> URL: https://issues.apache.org/jira/browse/FLINK-16582
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.15.3
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
> Fix For: 1.17.0
>
>
> {code:java}
> 4749 [Flink Netty Client (50072) Thread 0] ERROR
> org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector [] - LEAK:
> ByteBuf.release() was not called before it's garbage-collected. See
> https://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 
> Created at:
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
>   
> org.apache.flink.runtime.io.network.netty.NettyBufferPoolTest.testNoHeapAllocations(NettyBufferPoolTest.java:38)
>   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   java.lang.reflect.Method.invoke(Method.java:498)
>   
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   org.junit.runners.Suite.runChild(Suite.java:128)
>   org.junit.runners.Suite.runChild(Suite.java:27)
>   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Test ignored.
> Process finished with exit code 0
> {code}
> We should released the allocated buffers in the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-16582) NettyBufferPoolTest may have warns on NettyBuffer leak

2022-11-22 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-16582:

Fix Version/s: 1.16.1
   1.15.4

> NettyBufferPoolTest may have warns on NettyBuffer leak 
> ---
>
> Key: FLINK-16582
> URL: https://issues.apache.org/jira/browse/FLINK-16582
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.15.3
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
>
> {code:java}
> 4749 [Flink Netty Client (50072) Thread 0] ERROR
> org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector [] - LEAK:
> ByteBuf.release() was not called before it's garbage-collected. See
> https://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 
> Created at:
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
>   
> org.apache.flink.runtime.io.network.netty.NettyBufferPoolTest.testNoHeapAllocations(NettyBufferPoolTest.java:38)
>   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   java.lang.reflect.Method.invoke(Method.java:498)
>   
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   org.junit.runners.Suite.runChild(Suite.java:128)
>   org.junit.runners.Suite.runChild(Suite.java:27)
>   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Test ignored.
> Process finished with exit code 0
> {code}
> We should released the allocated buffers in the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-16582) NettyBufferPoolTest may have warns on NettyBuffer leak

2022-11-22 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-16582:

Labels: pull-request-available  (was: auto-deprioritized-major 
auto-unassigned pull-request-available)

> NettyBufferPoolTest may have warns on NettyBuffer leak 
> ---
>
> Key: FLINK-16582
> URL: https://issues.apache.org/jira/browse/FLINK-16582
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.15.3
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
>
> {code:java}
> 4749 [Flink Netty Client (50072) Thread 0] ERROR
> org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector [] - LEAK:
> ByteBuf.release() was not called before it's garbage-collected. See
> https://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 
> Created at:
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
>   
> org.apache.flink.runtime.io.network.netty.NettyBufferPoolTest.testNoHeapAllocations(NettyBufferPoolTest.java:38)
>   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   java.lang.reflect.Method.invoke(Method.java:498)
>   
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   org.junit.runners.Suite.runChild(Suite.java:128)
>   org.junit.runners.Suite.runChild(Suite.java:27)
>   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Test ignored.
> Process finished with exit code 0
> {code}
> We should released the allocated buffers in the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29557) The SinkOperator's OutputFormat function is not recognized

2022-11-20 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-29557.
---
Fix Version/s: 1.17.0
   Resolution: Fixed

> The SinkOperator's OutputFormat function is not recognized
> --
>
> Key: FLINK-29557
> URL: https://issues.apache.org/jira/browse/FLINK-29557
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Table SQL / API
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> In the {{SimpleOperatorFactory#of}}, only {{StreamSink}} is handled to 
> register as {{SimpleOutputFormatOperatorFactory}}. So it will lost the output 
> format information in  {{SinkOperator}}. Then some hook functions like 
> {{FinalizeOnMaster}} will have no chance to be executed.
> Due to the {{SinkOperator}} is in the table module, it can't be reached 
> directly in the {{flink-streaming-java}}. So maybe we need introduce an extra 
> common class eg: {{SinkFunctionOperator}} to describe the {{Sink}} operator 
> and handle it individually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29557) The SinkOperator's OutputFormat function is not recognized

2022-11-20 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636277#comment-17636277
 ] 

Yun Gao commented on FLINK-29557:
-

Merged on master via a56af3b5702f805b477f8cf11a27167e626ad9fe.

> The SinkOperator's OutputFormat function is not recognized
> --
>
> Key: FLINK-29557
> URL: https://issues.apache.org/jira/browse/FLINK-29557
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Table SQL / API
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> In the {{SimpleOperatorFactory#of}}, only {{StreamSink}} is handled to 
> register as {{SimpleOutputFormatOperatorFactory}}. So it will lost the output 
> format information in  {{SinkOperator}}. Then some hook functions like 
> {{FinalizeOnMaster}} will have no chance to be executed.
> Due to the {{SinkOperator}} is in the table module, it can't be reached 
> directly in the {{flink-streaming-java}}. So maybe we need introduce an extra 
> common class eg: {{SinkFunctionOperator}} to describe the {{Sink}} operator 
> and handle it individually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration

2022-11-17 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635269#comment-17635269
 ] 

Yun Gao edited comment on FLINK-29572 at 11/17/22 10:04 AM:


Hi all, very thanks for the discussion! I'll first close this issue for its 
duplicated. Let's head to https://issues.apache.org/jira/browse/FLINK-27341 to 
track the progress. 


was (Author: gaoyunhaii):
Hi all, I'll first close this issue for its duplicated. Let's head to 
https://issues.apache.org/jira/browse/FLINK-27341 to track the progress. 

> Flink Task Manager skip loopback interface for resource manager registration
> 
>
> Key: FLINK-29572
> URL: https://issues.apache.org/jira/browse/FLINK-29572
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.3
> Environment: Flink 1.15.2
> Kubernetes with Istio Proxy
>Reporter: Kevin Li
>Priority: Major
>
> Currently Flink Task Manager use different local interface to bind to connect 
> to Resource Manager. First one is Loopback interface. Normally if Job Manager 
> is running on remote host/container, using loopback interface to connect will 
> fail and it will pick up correct IP address.
> However, if Task Manager is running with some proxy, loopback interface can 
> connect to remote host as well. This will result 127.0.0.1 reported to 
> Resource Manager during registration, even Job Manager/Resource Manager runs 
> on remote host, and problem will happen. For us, only one Task Manager can 
> register in this case.
> I suggest adding configuration to skip Loopback interface check if we know 
> Job/Resource Manager is running on remote host/container.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration

2022-11-17 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-29572.
---
Resolution: Duplicate

> Flink Task Manager skip loopback interface for resource manager registration
> 
>
> Key: FLINK-29572
> URL: https://issues.apache.org/jira/browse/FLINK-29572
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.3
> Environment: Flink 1.15.2
> Kubernetes with Istio Proxy
>Reporter: Kevin Li
>Priority: Major
>
> Currently Flink Task Manager use different local interface to bind to connect 
> to Resource Manager. First one is Loopback interface. Normally if Job Manager 
> is running on remote host/container, using loopback interface to connect will 
> fail and it will pick up correct IP address.
> However, if Task Manager is running with some proxy, loopback interface can 
> connect to remote host as well. This will result 127.0.0.1 reported to 
> Resource Manager during registration, even Job Manager/Resource Manager runs 
> on remote host, and problem will happen. For us, only one Task Manager can 
> register in this case.
> I suggest adding configuration to skip Loopback interface check if we know 
> Job/Resource Manager is running on remote host/container.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration

2022-11-17 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635269#comment-17635269
 ] 

Yun Gao commented on FLINK-29572:
-

Hi all, I'll first close this issue for its duplicated. Let's head to 
https://issues.apache.org/jira/browse/FLINK-27341 to track the progress. 

> Flink Task Manager skip loopback interface for resource manager registration
> 
>
> Key: FLINK-29572
> URL: https://issues.apache.org/jira/browse/FLINK-29572
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.3
> Environment: Flink 1.15.2
> Kubernetes with Istio Proxy
>Reporter: Kevin Li
>Priority: Major
>
> Currently Flink Task Manager use different local interface to bind to connect 
> to Resource Manager. First one is Loopback interface. Normally if Job Manager 
> is running on remote host/container, using loopback interface to connect will 
> fail and it will pick up correct IP address.
> However, if Task Manager is running with some proxy, loopback interface can 
> connect to remote host as well. This will result 127.0.0.1 reported to 
> Resource Manager during registration, even Job Manager/Resource Manager runs 
> on remote host, and problem will happen. For us, only one Task Manager can 
> register in this case.
> I suggest adding configuration to skip Loopback interface check if we know 
> Job/Resource Manager is running on remote host/container.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29610) Infinite timeout is used in SavepointHandlers and CheckpointTriggerHandler calls to RestfulGateway

2022-11-17 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635265#comment-17635265
 ] 

Yun Gao commented on FLINK-29610:
-

Thanks [~Jiale] for the PR! I'll have a look

> Infinite timeout is used in SavepointHandlers and CheckpointTriggerHandler 
> calls to RestfulGateway
> --
>
> Key: FLINK-29610
> URL: https://issues.apache.org/jira/browse/FLINK-29610
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Jiale Tan
>Assignee: Jiale Tan
>Priority: Major
>  Labels: pull-request-available
>
> In {{{}SavepointHandlers{}}}, both 
> {{[StopWithSavepointHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L214]}}
>  and 
> {{[SavepointTriggerHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L258]}}
>  are calling {{RestfulGateway}} with {{RpcUtils.INF_TIMEOUT}}
> Same thing happens in the 
> {{[CheckpointTriggerHandler|https://github.com/apache/flink/blob/8e66be89dfcb54b7256d51e9d89222ae6701061f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlers.java#L146]}}
> As pointed out in 
> [this|https://github.com/apache/flink/pull/20852#discussion_r992218970] 
> discussion, we will need to either figure out why {{RpcUtils.INF_TIMEOUT}} is 
> used, or remove it if there is no strong reason to use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29498) Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API

2022-11-17 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-29498.
---
Fix Version/s: 1.17.0
   Resolution: Fixed

> Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API
> --
>
> Key: FLINK-29498
> URL: https://issues.apache.org/jira/browse/FLINK-29498
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala
>Affects Versions: 1.15.3
>Reporter: Eric Xiao
>Assignee: Eric Xiao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> We are using the async I/O to make HTTP calls and one of the features we 
> wanted to leverage was the retries, so we pulled the newest commit: 
> [http://github.com/apache/flink/pull/19983] into our internal Flink fork.
> When I try calling the function {{AsyncDataStream.unorderedWaitWithRetry}} 
> from the scala API I with a retry strategy from the java API I get an error 
> as {{unorderedWaitWithRetry}} expects a scala retry strategy. The problem is 
> that retry strategies were only implemented in java and not Scala in this PR: 
> [http://github.com/apache/flink/pull/19983].
>  
> Here is some of the code to reproduce the error:
> {code:java}
> import org.apache.flink.streaming.api.scala.AsyncDataStream
> import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => 
> JAsyncRetryStrategies}
> val javaAsyncRetryStrategy = new 
> JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 100L)
> .build()
> val data = AsyncDataStream.unorderedWaitWithRetry(
>   source,
>   asyncOperator,
>   pipelineTimeoutInMs,
>   TimeUnit.MILLISECONDS,
>   javaAsyncRetryStrategy
> ){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29498) Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API

2022-11-17 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635258#comment-17635258
 ] 

Yun Gao commented on FLINK-29498:
-

Merged on master via d426489c9e5c634e2eec8fde6c71356700b7d4b2.

> Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API
> --
>
> Key: FLINK-29498
> URL: https://issues.apache.org/jira/browse/FLINK-29498
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala
>Affects Versions: 1.15.3
>Reporter: Eric Xiao
>Assignee: Eric Xiao
>Priority: Minor
>  Labels: pull-request-available
>
> We are using the async I/O to make HTTP calls and one of the features we 
> wanted to leverage was the retries, so we pulled the newest commit: 
> [http://github.com/apache/flink/pull/19983] into our internal Flink fork.
> When I try calling the function {{AsyncDataStream.unorderedWaitWithRetry}} 
> from the scala API I with a retry strategy from the java API I get an error 
> as {{unorderedWaitWithRetry}} expects a scala retry strategy. The problem is 
> that retry strategies were only implemented in java and not Scala in this PR: 
> [http://github.com/apache/flink/pull/19983].
>  
> Here is some of the code to reproduce the error:
> {code:java}
> import org.apache.flink.streaming.api.scala.AsyncDataStream
> import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => 
> JAsyncRetryStrategies}
> val javaAsyncRetryStrategy = new 
> JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 100L)
> .build()
> val data = AsyncDataStream.unorderedWaitWithRetry(
>   source,
>   asyncOperator,
>   pipelineTimeoutInMs,
>   TimeUnit.MILLISECONDS,
>   javaAsyncRetryStrategy
> ){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29940) ExecutionGraph logs job state change at ERROR level when job fails

2022-11-17 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635245#comment-17635245
 ] 

Yun Gao commented on FLINK-29940:
-

Hi [~liuml07] It looks to me that it might not be an error from the 
JobManager's perspective since JM is able to recover from this status. Also may 
I have a double confirmation about why we want to make it to be ERROR level ? 
If for the purpose of monitoring, the log might not be very stable and it might 
be better to rely on metrics like numberOfRestarts. 

> ExecutionGraph logs job state change at ERROR level when job fails
> --
>
> Key: FLINK-29940
> URL: https://issues.apache.org/jira/browse/FLINK-29940
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Mingliang Liu
>Priority: Minor
>  Labels: pull-request-available
>
> When job switched to FAILED state, the log is very useful to understand why 
> it failed along with the root cause exception stack. However, the current log 
> level is INFO - a bit inconvenient for users to search from logging with so 
> many surrounding log lines. We can log at ERROR level when the job switched 
> to FAILED state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29925) table ui of configure value is strange

2022-11-17 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635235#comment-17635235
 ] 

Yun Gao commented on FLINK-29925:
-

cc [~junhan] 

> table ui of configure value is strange
> --
>
> Key: FLINK-29925
> URL: https://issues.apache.org/jira/browse/FLINK-29925
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.15.3
>Reporter: jiadong.lu
>Priority: Minor
> Attachments: 截屏2022-11-08 15.37.04.png
>
>
> As shown in the figure below, when the configure value is very large, the ui 
> of the table is a bit strange  !截屏2022-11-08 15.37.04.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27341) TaskManager running together with JobManager are bind to 127.0.0.1

2022-11-15 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17634670#comment-17634670
 ] 

Yun Gao commented on FLINK-27341:
-

Very thanks [~huwh] for the checking! I'll have a look.

> TaskManager running together with JobManager are bind to 127.0.0.1
> --
>
> Key: FLINK-27341
> URL: https://issues.apache.org/jira/browse/FLINK-27341
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
>
> If some TaskManagers running with JobManager on the same machine while some 
> other TaskManager not, the TaskManagers running together with JobManager 
> would bind to localhost or 127.0.01, which makes the Netty connections across 
> the TaskManagers fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27341) TaskManager running together with JobManager are bind to 127.0.0.1

2022-11-09 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631342#comment-17631342
 ] 

Yun Gao commented on FLINK-27341:
-

Hi [~chesnay]  sorry for missing the comment, I also think we should drop the 
lookback address, but it may need some tests for different deployment targets. 

And very thanks [~huwh] for helping tracking this issue! I have assigned the 
issue to you. And for the method, do you think we might directly remove the 
LOOPBACK? I previously have some try with 
[https://github.com/gaoyunhaii/flink/pull/new/remove_loopback] and it should 
works in standalone session. 

> TaskManager running together with JobManager are bind to 127.0.0.1
> --
>
> Key: FLINK-27341
> URL: https://issues.apache.org/jira/browse/FLINK-27341
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Assignee: Weihua Hu
>Priority: Major
>
> If some TaskManagers running with JobManager on the same machine while some 
> other TaskManager not, the TaskManagers running together with JobManager 
> would bind to localhost or 127.0.01, which makes the Netty connections across 
> the TaskManagers fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-27341) TaskManager running together with JobManager are bind to 127.0.0.1

2022-11-09 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-27341:
---

Assignee: Weihua Hu

> TaskManager running together with JobManager are bind to 127.0.0.1
> --
>
> Key: FLINK-27341
> URL: https://issues.apache.org/jira/browse/FLINK-27341
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Assignee: Weihua Hu
>Priority: Major
>
> If some TaskManagers running with JobManager on the same machine while some 
> other TaskManager not, the TaskManagers running together with JobManager 
> would bind to localhost or 127.0.01, which makes the Netty connections across 
> the TaskManagers fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22587) Support aggregations in batch mode with DataStream API

2022-11-07 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17630193#comment-17630193
 ] 

Yun Gao commented on FLINK-22587:
-

Hi, sorry for forgetting to update here, with some more try we found that it 
works to use an event-time window that assigns all the records to the same 
window [0, 
+Inf):[https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java]

And the join works with 

source1.join(source2)
                .where(a -> a.f0)
                .equalTo(b -> b.f0)
                .window(new EndOfStreamWindows())
                .apply(xx)

for both bounded streaming processing and batch processing. It does not require 
the records to have event-time and watermark, since the assignment does not 
rely on event-time, and the window will be triggered by the Long.MAX_VALUE 
inserted at the end of stream. 

But we'll still try to propose a proper fix for this issue. One option is that 
we does not force to set a window in this case, if the window is not set, we'll 
by default mark it all the records. 

> Support aggregations in batch mode with DataStream API
> --
>
> Key: FLINK-22587
> URL: https://issues.apache.org/jira/browse/FLINK-22587
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.0, 1.13.0
>Reporter: Etienne Chauchot
>Priority: Major
>
> A pipeline like this *in batch mode* would output no data
> {code:java}
> stream.join(otherStream)
> .where()
> .equalTo()
> .window(GlobalWindows.create())
> .apply()
> {code}
> Indeed the default trigger for GlobalWindow is NeverTrigger which never 
> fires. If we set a _EventTimeTrigger_ it will fire with every element as the 
> watermark will be set to +INF (batch mode) and will pass the end of the 
> global window with each new element. A _ProcessingTimeTrigger_ never fires 
> either and all elapsed time or delta based triggers would not be suited for 
> batch.
> Same goes for _reduce()_ instead of join().
> So I guess we miss something for batch support with DataStream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28469) Setting a timer within broadcast applyToKeyedState()

2022-11-03 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17628176#comment-17628176
 ] 

Yun Gao commented on FLINK-28469:
-

Hi [~Sandys-Lumsdaine]  logically it is a bit hard to achieve that since the 
timer service relies on the a global current key, but applyToKey does not 
modify the current key. May I have a double confirmation of why you need this 
functionality?

> Setting a timer within broadcast applyToKeyedState()
> 
>
> Key: FLINK-28469
> URL: https://issues.apache.org/jira/browse/FLINK-28469
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.16.0
>Reporter: James
>Priority: Minor
>
> I know we can’t set a timer in the processBroadcastElement() of the 
> KeyedBroadcastProcessFunction as there is no key.
>  
> However, there is a context.applyToKeyedState() method which allows us to 
> iterate over the keyed state in the scope of a key. So it is possible to add 
> access to the TimerService onto the Context parameter passed into that 
> delegate?
>  
> Since the code running in the applyToKeyedState() method is scoped to a key 
> we should be able to set up timers for that key too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28455) pyflink tableResult collect result to local timeout

2022-11-03 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17628168#comment-17628168
 ] 

Yun Gao commented on FLINK-28455:
-

[~hxb]  could you help to have a look at this issue?

> pyflink tableResult collect result to local  timeout
> 
>
> Key: FLINK-28455
> URL: https://issues.apache.org/jira/browse/FLINK-28455
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: zhou
>Priority: Major
>  Labels: flink, pyflink
>
> when I used pyflink do this:
>  
> {code:java}
> with party_enter_final_result.execute().collect() as results:
>      for result in results:{code}
> sometimes TimeoutException occured,the Exception as following:
> {code:java}
> [2022-07-07 01:18:55,843] {bash.py:173} INFO - Job has been submitted with 
> JobID 017de55acf2a71552fc293626cfbbe67
> [2022-07-07 01:20:02,384] {bash.py:173} INFO - Traceback (most recent call 
> last):
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/opt/airflow/data/repo/dags/chloe/chloe_counter.py", line 80, in 
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -     main(date)
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/opt/airflow/data/repo/dags/chloe/chloe_counter.py", line 53, in main
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -     for result in results:
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/space/flink/opt/python/pyflink.zip/pyflink/table/table_result.py", line 
> 236, in __next__
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/space/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 
> 1285, in __call__
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/space/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, 
> in deco
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/space/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 326, 
> in get_return_value
> [2022-07-07 01:20:02,384] {bash.py:173} INFO - py4j.protocol.Py4JJavaError: 
> An error occurred while calling o66.hasNext.
> [2022-07-07 01:20:02,384] {bash.py:173} INFO - : java.lang.RuntimeException: 
> Failed to fetch next result
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> java.lang.reflect.Method.invoke(Method.java:498)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> java.lang.Thread.run(Thread.java:748)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO - Caused by: 
> java.io.IOException: Failed to fetch job execution result
> [2022-07-07 01:20:02,386] {bash.py:173} INFO -     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
> [2022-07-07 01:20:02,386] {bash.py:173} INFO -     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
> [2022-07-07 01:20:02,386] {bash.py:173} INFO -     at 
> 

[jira] [Updated] (FLINK-28455) pyflink tableResult collect result to local timeout

2022-11-03 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-28455:

Component/s: API / Python
 (was: API / Core)

> pyflink tableResult collect result to local  timeout
> 
>
> Key: FLINK-28455
> URL: https://issues.apache.org/jira/browse/FLINK-28455
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: zhou
>Priority: Major
>  Labels: flink, pyflink
>
> when I used pyflink do this:
>  
> {code:java}
> with party_enter_final_result.execute().collect() as results:
>      for result in results:{code}
> sometimes TimeoutException occured,the Exception as following:
> {code:java}
> [2022-07-07 01:18:55,843] {bash.py:173} INFO - Job has been submitted with 
> JobID 017de55acf2a71552fc293626cfbbe67
> [2022-07-07 01:20:02,384] {bash.py:173} INFO - Traceback (most recent call 
> last):
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/opt/airflow/data/repo/dags/chloe/chloe_counter.py", line 80, in 
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -     main(date)
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/opt/airflow/data/repo/dags/chloe/chloe_counter.py", line 53, in main
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -     for result in results:
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/space/flink/opt/python/pyflink.zip/pyflink/table/table_result.py", line 
> 236, in __next__
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/space/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 
> 1285, in __call__
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/space/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, 
> in deco
> [2022-07-07 01:20:02,384] {bash.py:173} INFO -   File 
> "/space/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 326, 
> in get_return_value
> [2022-07-07 01:20:02,384] {bash.py:173} INFO - py4j.protocol.Py4JJavaError: 
> An error occurred while calling o66.hasNext.
> [2022-07-07 01:20:02,384] {bash.py:173} INFO - : java.lang.RuntimeException: 
> Failed to fetch next result
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> java.lang.reflect.Method.invoke(Method.java:498)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO -     at 
> java.lang.Thread.run(Thread.java:748)
> [2022-07-07 01:20:02,385] {bash.py:173} INFO - Caused by: 
> java.io.IOException: Failed to fetch job execution result
> [2022-07-07 01:20:02,386] {bash.py:173} INFO -     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
> [2022-07-07 01:20:02,386] {bash.py:173} INFO -     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
> [2022-07-07 01:20:02,386] {bash.py:173} INFO -     at 
> 

[jira] [Commented] (FLINK-28598) ClusterEntryPoint can't get the real exit reason when shutting down

2022-11-03 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17628167#comment-17628167
 ] 

Yun Gao commented on FLINK-28598:
-

[~zlzhang0122] May I have a double confirmation of the issue that which 
deployment mode you are using? Since I'm not quite sure which part the user 
code would be executed during starting. 

> ClusterEntryPoint can't get the real exit reason when shutting down
> ---
>
> Key: FLINK-28598
> URL: https://issues.apache.org/jira/browse/FLINK-28598
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Task
>Affects Versions: 1.14.2, 1.15.1
>Reporter: zlzhang0122
>Priority: Major
>
> When the cluster is starting and some error occurs, the ClusterEntryPoint 
> will shutDown the cluster asynchronous, but if it can't get a Throwable, the 
> shutDown reason will be null, but actually if it's a user code problem and 
> this may happen. 
> I think we can get the real exit reason caused by user code and pass it to 
> the diagnostics parameter, this may help users a lot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29364) Root cause of Exceptions thrown in the SourceReader start() method gets "swallowed".

2022-11-03 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17628164#comment-17628164
 ] 

Yun Gao commented on FLINK-29364:
-

Thanks [~wanglijie] for the checking, and [~afedulov] Could you help to have a 
double check with that?

> Root cause of Exceptions thrown in the SourceReader start() method gets 
> "swallowed".
> 
>
> Key: FLINK-29364
> URL: https://issues.apache.org/jira/browse/FLINK-29364
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.2
>Reporter: Alexander Fedulov
>Priority: Major
>
> If an exception is thrown in the {_}SourceReader{_}'s _start()_ method, its 
> root cause does not get captured.
> The details are still available here: 
> [Task.java#L758|https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L758]
> But the execution falls through to 
> [Task.java#L780|https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L780]
>   and discards the root cause of
> canceling the source invokable without recording the actual reason.
>  
> Hot to reproduce: 
> [DataGeneratorSourceITCase.java#L117|https://github.com/afedulov/flink/blob/3df7669fcc6ba08c5147195b80cc97ac1481ec8c/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceITCase.java#L117]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29852) The operator is repeatedly displayed on the Flink Web UI

2022-11-03 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17628149#comment-17628149
 ] 

Yun Gao commented on FLINK-29852:
-

Hi [~JasonLee] thanks for reporting the issue! Could this phenomenon be 
reproduced? And if possible could you provide the content of the response from 
the web server via http://:/jobs/ ?

> The operator is repeatedly displayed on the Flink Web UI
> 
>
> Key: FLINK-29852
> URL: https://issues.apache.org/jira/browse/FLINK-29852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.16.0
> Environment: Flink 1.16.0
>Reporter: JasonLee
>Priority: Major
> Attachments: image-2022-11-02-23-57-39-387.png
>
>
> All the operators in the DAG are shown repeatedly
> !image-2022-11-02-23-57-39-387.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28973) Extending /jars/:jarid/plan API to support setting Flink configs

2022-10-31 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17626512#comment-17626512
 ] 

Yun Gao commented on FLINK-28973:
-

Hi [~Zhanghao Chen] very thanks for the clarification! Is this issue a part of  
https://issues.apache.org/jira/browse/FLINK-27060 ? If so we may make it to be 
a sub-issue of that FLINK-27060

> Extending /jars/:jarid/plan API to support setting Flink configs
> 
>
> Key: FLINK-28973
> URL: https://issues.apache.org/jira/browse/FLINK-28973
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Zhanghao Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-27341) TaskManager running together with JobManager are bind to 127.0.0.1

2022-10-31 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17626508#comment-17626508
 ] 

Yun Gao edited comment on FLINK-27341 at 10/31/22 9:59 AM:
---

Hi [~nobleyd] sorry after I checked the code, it looks more complicated than I 
initially thought, thus I'm not very confident in changing this part of code 
right before the deadline without causing other issues and we then moved the 
fix after the release.

I also still think the issue is critical and cause a lot of confusions, we are 
still working on this issue and will fix it soon. 


was (Author: gaoyunhaii):
Hi [~nobleyd] sorry after I checked the code, it looks more complicated than I 
initially thought, thus I'm not very confident in changing this part of code 
right before the deadline without causing other issues and we'll move the fix 
after the release.

I also still think the issue is critical and cause a lot of confusions, we are 
still working on this issue and will fix it soon. 

> TaskManager running together with JobManager are bind to 127.0.0.1
> --
>
> Key: FLINK-27341
> URL: https://issues.apache.org/jira/browse/FLINK-27341
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Assignee: Chesnay Schepler
>Priority: Major
>
> If some TaskManagers running with JobManager on the same machine while some 
> other TaskManager not, the TaskManagers running together with JobManager 
> would bind to localhost or 127.0.01, which makes the Netty connections across 
> the TaskManagers fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27341) TaskManager running together with JobManager are bind to 127.0.0.1

2022-10-31 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17626508#comment-17626508
 ] 

Yun Gao commented on FLINK-27341:
-

Hi [~nobleyd] sorry after I checked the code, it looks more complicated than I 
initially thought, thus I'm not very confident in changing this part of code 
right before the deadline without causing other issues and we'll move the fix 
after the release.

I also still think the issue is critical and cause a lot of confusions, we are 
still working on this issue and will fix it soon. 

> TaskManager running together with JobManager are bind to 127.0.0.1
> --
>
> Key: FLINK-27341
> URL: https://issues.apache.org/jira/browse/FLINK-27341
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Assignee: Chesnay Schepler
>Priority: Major
>
> If some TaskManagers running with JobManager on the same machine while some 
> other TaskManager not, the TaskManagers running together with JobManager 
> would bind to localhost or 127.0.01, which makes the Netty connections across 
> the TaskManagers fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29610) Infinite timeout is used in SavepointHandlers calls to RestfulGateway

2022-10-31 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17626504#comment-17626504
 ] 

Yun Gao commented on FLINK-29610:
-

Hi [~Jiale] , It looks to me RestConfiguration.timeout is used for the REST 
Handlers requests, while the above method  use INF_TIMEOUT while requesting to 
the dispatcher, which is in fact an akka actor. 

> Infinite timeout is used in SavepointHandlers calls to RestfulGateway
> -
>
> Key: FLINK-29610
> URL: https://issues.apache.org/jira/browse/FLINK-29610
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Jiale Tan
>Assignee: Jiale Tan
>Priority: Major
>  Labels: pull-request-available
>
> In {{{}SavepointHandlers{}}}, both 
> {{[StopWithSavepointHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L214]}}
>  and 
> {{[SavepointTriggerHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L258]}}
>  are calling {{RestfulGateway}} with {{RpcUtils.INF_TIMEOUT}}
>  
> As pointed out in 
> [this|https://github.com/apache/flink/pull/20852#discussion_r992218970] 
> discussion, we will need to either figure out why {{RpcUtils.INF_TIMEOUT}} is 
> used, or remove it if there is no strong reason to use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29565) In Flink per job mode, the logs printed by taskManager on the web UI will not be highlighted, because the log contents are annotated due to special symbols, which will

2022-10-27 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-29565:
---

Assignee: wangshiwei

> In Flink per job mode, the logs printed by taskManager on the web UI will not 
> be highlighted, because the log contents are annotated due to special 
> symbols, which will affect the use experience. For more information, see Fig
> 
>
> Key: FLINK-29565
> URL: https://issues.apache.org/jira/browse/FLINK-29565
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.3
>Reporter: wangshiwei
>Assignee: wangshiwei
>Priority: Minor
> Attachments: image-2022-10-10-18-40-27-721.png, 
> image-2022-10-10-18-43-31-897.png, image-2022-10-10-18-43-53-713.png, 
> image-2022-10-10-18-45-17-228.png, image-2022-10-10-19-02-29-796.png, 
> image-2022-10-10-19-03-27-670.png
>
>
>  
> !image-2022-10-10-19-03-27-670.png|width=580,height=317!
> !image-2022-10-10-18-43-53-713.png|width=726,height=47!
> This kind of '/*' content will appear in the print log of the 
> logEnvironmentInfo method in the EnvironmentInformation class. The following 
> logs will be commented out without highlighting
> *verification*
> !image-2022-10-10-18-45-17-228.png|width=880,height=161!
> After manually printing '*/' in the business code, the log is normal
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29565) In Flink per job mode, the logs printed by taskManager on the web UI will not be highlighted, because the log contents are annotated due to special symbols, which will

2022-10-27 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17625435#comment-17625435
 ] 

Yun Gao commented on FLINK-29565:
-

Thanks [~1336399775] for helping fix the issue! I have assigned the issue to 
you. 

> In Flink per job mode, the logs printed by taskManager on the web UI will not 
> be highlighted, because the log contents are annotated due to special 
> symbols, which will affect the use experience. For more information, see Fig
> 
>
> Key: FLINK-29565
> URL: https://issues.apache.org/jira/browse/FLINK-29565
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.3
>Reporter: wangshiwei
>Priority: Minor
> Attachments: image-2022-10-10-18-40-27-721.png, 
> image-2022-10-10-18-43-31-897.png, image-2022-10-10-18-43-53-713.png, 
> image-2022-10-10-18-45-17-228.png, image-2022-10-10-19-02-29-796.png, 
> image-2022-10-10-19-03-27-670.png
>
>
>  
> !image-2022-10-10-19-03-27-670.png|width=580,height=317!
> !image-2022-10-10-18-43-53-713.png|width=726,height=47!
> This kind of '/*' content will appear in the print log of the 
> logEnvironmentInfo method in the EnvironmentInformation class. The following 
> logs will be commented out without highlighting
> *verification*
> !image-2022-10-10-18-45-17-228.png|width=880,height=161!
> After manually printing '*/' in the business code, the log is normal
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29565) In Flink per job mode, the logs printed by taskManager on the web UI will not be highlighted, because the log contents are annotated due to special symbols, which will

2022-10-27 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17625396#comment-17625396
 ] 

Yun Gao commented on FLINK-29565:
-

Hi [~1336399775] then would you like to open a PR to fix this issue? 

> In Flink per job mode, the logs printed by taskManager on the web UI will not 
> be highlighted, because the log contents are annotated due to special 
> symbols, which will affect the use experience. For more information, see Fig
> 
>
> Key: FLINK-29565
> URL: https://issues.apache.org/jira/browse/FLINK-29565
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.3
>Reporter: wangshiwei
>Priority: Minor
> Attachments: image-2022-10-10-18-40-27-721.png, 
> image-2022-10-10-18-43-31-897.png, image-2022-10-10-18-43-53-713.png, 
> image-2022-10-10-18-45-17-228.png, image-2022-10-10-19-02-29-796.png, 
> image-2022-10-10-19-03-27-670.png
>
>
>  
> !image-2022-10-10-19-03-27-670.png|width=580,height=317!
> !image-2022-10-10-18-43-53-713.png|width=726,height=47!
> This kind of '/*' content will appear in the print log of the 
> logEnvironmentInfo method in the EnvironmentInformation class. The following 
> logs will be commented out without highlighting
> *verification*
> !image-2022-10-10-18-45-17-228.png|width=880,height=161!
> After manually printing '*/' in the business code, the log is normal
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28973) Extending /jars/:jarid/plan API to support setting Flink configs

2022-10-27 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624941#comment-17624941
 ] 

Yun Gao commented on FLINK-28973:
-

Hi [~Zhanghao Chen] may I have a double confirmation of the requirements: 
currently Flink contains two types of config options, namely the cluster-level 
and the job-level ones. The cluster level configurations are set on startup of 
the cluster and could not be changed on job submitting, while the job-level 
ones should be able to be changed via the programming APIs. 

> Extending /jars/:jarid/plan API to support setting Flink configs
> 
>
> Key: FLINK-28973
> URL: https://issues.apache.org/jira/browse/FLINK-28973
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Zhanghao Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29339) JobMasterPartitionTrackerImpl#requestShuffleDescriptorsFromResourceManager blocks main thread

2022-10-27 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624908#comment-17624908
 ] 

Yun Gao commented on FLINK-29339:
-

 The PR is already under review previously, let's continue the review in the 
pull request page. 

> JobMasterPartitionTrackerImpl#requestShuffleDescriptorsFromResourceManager 
> blocks main thread
> -
>
> Key: FLINK-29339
> URL: https://issues.apache.org/jira/browse/FLINK-29339
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Chesnay Schepler
>Assignee: Xuannan Su
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1
>
>
> {code:java}
> private List requestShuffleDescriptorsFromResourceManager(
> IntermediateDataSetID intermediateDataSetID) {
> Preconditions.checkNotNull(
> resourceManagerGateway, "JobMaster is not connected to 
> ResourceManager");
> try {
> return this.resourceManagerGateway
> .getClusterPartitionsShuffleDescriptors(intermediateDataSetID)
> .get(); // <-- there's your problem
> } catch (Throwable e) {
> throw new RuntimeException(
> String.format(
> "Failed to get shuffle descriptors of intermediate 
> dataset %s from ResourceManager",
> intermediateDataSetID),
> e);
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29565) In Flink per job mode, the logs printed by taskManager on the web UI will not be highlighted, because the log contents are annotated due to special symbols, which will

2022-10-27 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624900#comment-17624900
 ] 

Yun Gao commented on FLINK-29565:
-

Also cc [~junhan] 

> In Flink per job mode, the logs printed by taskManager on the web UI will not 
> be highlighted, because the log contents are annotated due to special 
> symbols, which will affect the use experience. For more information, see Fig
> 
>
> Key: FLINK-29565
> URL: https://issues.apache.org/jira/browse/FLINK-29565
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.3
>Reporter: wangshiwei
>Priority: Minor
> Attachments: image-2022-10-10-18-40-27-721.png, 
> image-2022-10-10-18-43-31-897.png, image-2022-10-10-18-43-53-713.png, 
> image-2022-10-10-18-45-17-228.png, image-2022-10-10-19-02-29-796.png, 
> image-2022-10-10-19-03-27-670.png
>
>
>  
> !image-2022-10-10-19-03-27-670.png|width=580,height=317!
> !image-2022-10-10-18-43-53-713.png|width=726,height=47!
> This kind of '/*' content will appear in the print log of the 
> logEnvironmentInfo method in the EnvironmentInformation class. The following 
> logs will be commented out without highlighting
> *verification*
> !image-2022-10-10-18-45-17-228.png|width=880,height=161!
> After manually printing '*/' in the business code, the log is normal
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29610) Infinite timeout is used in SavepointHandlers calls to RestfulGateway

2022-10-27 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-29610:
---

Assignee: Jiale Tan

> Infinite timeout is used in SavepointHandlers calls to RestfulGateway
> -
>
> Key: FLINK-29610
> URL: https://issues.apache.org/jira/browse/FLINK-29610
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Jiale Tan
>Assignee: Jiale Tan
>Priority: Major
>
> In {{{}SavepointHandlers{}}}, both 
> {{[StopWithSavepointHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L214]}}
>  and 
> {{[SavepointTriggerHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L258]}}
>  are calling {{RestfulGateway}} with {{RpcUtils.INF_TIMEOUT}}
>  
> As pointed out in 
> [this|https://github.com/apache/flink/pull/20852#discussion_r992218970] 
> discussion, we will need to either figure out why {{RpcUtils.INF_TIMEOUT}} is 
> used, or remove it if there is no strong reason to use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29610) Infinite timeout is used in SavepointHandlers calls to RestfulGateway

2022-10-27 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624896#comment-17624896
 ] 

Yun Gao commented on FLINK-29610:
-

Thanks [~Jiale] for tracking the issue! I assigned the issue to you. 

> Infinite timeout is used in SavepointHandlers calls to RestfulGateway
> -
>
> Key: FLINK-29610
> URL: https://issues.apache.org/jira/browse/FLINK-29610
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Jiale Tan
>Priority: Major
>
> In {{{}SavepointHandlers{}}}, both 
> {{[StopWithSavepointHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L214]}}
>  and 
> {{[SavepointTriggerHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L258]}}
>  are calling {{RestfulGateway}} with {{RpcUtils.INF_TIMEOUT}}
>  
> As pointed out in 
> [this|https://github.com/apache/flink/pull/20852#discussion_r992218970] 
> discussion, we will need to either figure out why {{RpcUtils.INF_TIMEOUT}} is 
> used, or remove it if there is no strong reason to use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29557) The SinkOperator's OutputFormat function is not recognized

2022-10-26 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624867#comment-17624867
 ] 

Yun Gao commented on FLINK-29557:
-

Hi [~aitozi] LGTM, very thanks for tracking the issue. Would you like to open a 
PR to fix the issue? Both side is ok to me. 

> The SinkOperator's OutputFormat function is not recognized
> --
>
> Key: FLINK-29557
> URL: https://issues.apache.org/jira/browse/FLINK-29557
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Table SQL / API
>Reporter: Aitozi
>Priority: Major
>
> In the {{SimpleOperatorFactory#of}}, only {{StreamSink}} is handled to 
> register as {{SimpleOutputFormatOperatorFactory}}. So it will lost the output 
> format information in  {{SinkOperator}}. Then some hook functions like 
> {{FinalizeOnMaster}} will have no chance to be executed.
> Due to the {{SinkOperator}} is in the table module, it can't be reached 
> directly in the {{flink-streaming-java}}. So maybe we need introduce an extra 
> common class eg: {{SinkFunctionOperator}} to describe the {{Sink}} operator 
> and handle it individually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29557) The SinkOperator's OutputFormat function is not recognized

2022-10-25 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623601#comment-17623601
 ] 

Yun Gao commented on FLINK-29557:
-

Thanks [~aitozi] for reporting the issue! Now I have understood why there is 
issue here, I'll have a double confirmation of the proper way to fix it here, 
and do you already have some thoughts on it?

> The SinkOperator's OutputFormat function is not recognized
> --
>
> Key: FLINK-29557
> URL: https://issues.apache.org/jira/browse/FLINK-29557
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Table SQL / API
>Reporter: Aitozi
>Priority: Major
>
> In the {{SimpleOperatorFactory#of}}, only {{StreamSink}} is handled to 
> register as {{SimpleOutputFormatOperatorFactory}}. So it will lost the output 
> format information in  {{SinkOperator}}. Then some hook functions like 
> {{FinalizeOnMaster}} will have no chance to be executed.
> Due to the {{SinkOperator}} is in the table module, it can't be reached 
> directly in the {{flink-streaming-java}}. So maybe we need introduce an extra 
> common class eg: {{SinkFunctionOperator}} to describe the {{Sink}} operator 
> and handle it individually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29459) Sink v2 has bugs in supporting legacy v1 implementations with global committer

2022-10-24 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623149#comment-17623149
 ] 

Yun Gao commented on FLINK-29459:
-

Hi [~fpaul] [~KristoffSC] Very thanks for fixing the issues and very sorry for 
missed the previous notifications for in the holiday then. Regarding the 
current sink v2 mechanism I have some more thoughts:

Currently we rely on the CommittableSummary and CommittableWithLineage message 
to coordinate between Writers and Committers. For each checkpoint, each Writer 
subtask would first emit a CommittableSummary to the Committers, which contains 
the number of Committables to send. Then the Writer subtask emit that number of 
CommittableWithLineage messages to the Committers. The Committers relies on the 
number in the summary to detect if it has received all the Committables from 
each write subtask. But the mechanism contains some issues:
 # It could only support the partitioner with one target for each source 
between Writer and Committer, like forward / rescale. If for the long run we 
want to support the Committers with arbitrary parallelism, it might cause 
issues if Writer and Committer have different parallelism. Similarly it also 
complicate the authors of connectors that using PreCommitterTopolgy. 
 # With unaligned checkpoint and rescale after recovering, if some 
CommittableSummary messages have been processed and stored in the snapshot, but 
the corresponding CommittableWithLineage messages have been assigned to other 
tasks, the number of Committables would be not correct. 

One possible alternative might be instead of relying on numbers, we might first 
emit the Committables, then followed by a broadcast message that confirms the 
end of a checkpoint. The Committable would know that it has received all the 
Committables after received the Confirmed messages from all the previous tasks. 
The mechanism is a bit like how watermark works. Then for the above two issues:


 # It would support all the partitioners. 
 # For unaligned checkpoint and rescaling case, we could simply commit all the 
Committables with the startup id and ignore all the confirmation messages of 
the same checkpoint id on startup. We could then wait for the confirmation 
message of the next checkpoint id to mark all the previous checkpoints as 
finished. 

How do you think about this? Sorry if I overlook something. 

> Sink v2 has bugs in supporting legacy v1 implementations with global committer
> --
>
> Key: FLINK-29459
> URL: https://issues.apache.org/jira/browse/FLINK-29459
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> Currently when supporting Sink implementation using version 1 interface, 
> there are issues after restoring from a checkpoint after failover:
>  # In global committer operator, when restoring SubtaskCommittableManager, 
> the subtask id is replaced with the one in the current operator. This means 
> that the id originally is the id of the sender task (0 ~ N - 1), but after 
> restoring it has to be 0. This would cause Duplication Key exception during 
> restoring.
>  # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl 
> is always restored to 0 after failover for all the subtasks. This makes the 
> summary sent to the Global Committer is attached with wrong subtask id.
>  # For Committer operator, the checkpoint id of SubtaskCommittableManager is 
> always restored to 1 after failover, this make the following committable sent 
> to the global committer is attached with wrong checkpoint id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29498) Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API

2022-10-20 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17621460#comment-17621460
 ] 

Yun Gao commented on FLINK-29498:
-

[~eric.xiao] Thanks for the PR! I'll have a look

> Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API
> --
>
> Key: FLINK-29498
> URL: https://issues.apache.org/jira/browse/FLINK-29498
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala
>Affects Versions: 1.15.2
>Reporter: Eric Xiao
>Assignee: Eric Xiao
>Priority: Minor
>
> We are using the async I/O to make HTTP calls and one of the features we 
> wanted to leverage was the retries, so we pulled the newest commit: 
> [http://github.com/apache/flink/pull/19983] into our internal Flink fork.
> When I try calling the function {{AsyncDataStream.unorderedWaitWithRetry}} 
> from the scala API I with a retry strategy from the java API I get an error 
> as {{unorderedWaitWithRetry}} expects a scala retry strategy. The problem is 
> that retry strategies were only implemented in java and not Scala in this PR: 
> [http://github.com/apache/flink/pull/19983].
>  
> Here is some of the code to reproduce the error:
> {code:java}
> import org.apache.flink.streaming.api.scala.AsyncDataStream
> import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => 
> JAsyncRetryStrategies}
> val javaAsyncRetryStrategy = new 
> JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 100L)
> .build()
> val data = AsyncDataStream.unorderedWaitWithRetry(
>   source,
>   asyncOperator,
>   pipelineTimeoutInMs,
>   TimeUnit.MILLISECONDS,
>   javaAsyncRetryStrategy
> ){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29183) flink job ended before window time up while using bounded stream source

2022-10-20 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620855#comment-17620855
 ] 

Yun Gao edited comment on FLINK-29183 at 10/20/22 7:39 AM:
---

Hi [~landlord] , this is indeed a known issue that currently Flink will ignores 
all the register times at the end of bounded stream. Recently we have some 
discussion on this issue under 
https://issues.apache.org/jira/browse/FLINK-18647.   

We are planning to allow users to specify how to deal with the timers at the 
end of streams. I'll open a FLIP and discussion for that soon. 

If you feel ok I'll first close this issue and let's unify the discussion in 
FLINK-18647 and the mail list. 


was (Author: gaoyunhaii):
Hi [~landlord] , this is indeed a known issue that currently Flink will ignores 
all the register times at the end of bounded stream. Recently we have some 
discussion on this issue under 
https://issues.apache.org/jira/browse/FLINK-18647.   

We are planning to allow users to specify how to deal with the timers at the 
end of streams. I'll open a FLIP and discussion for that soon. 

> flink job ended before window time up while using bounded stream source
> ---
>
> Key: FLINK-29183
> URL: https://issues.apache.org/jira/browse/FLINK-29183
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.2
>Reporter: Lu
>Priority: Major
> Attachments: image-2022-09-02-15-33-08-459.png, 
> image-2022-09-02-15-35-18-590.png, image-2022-09-06-18-09-40-132.png, 
> image-2022-09-06-18-09-43-745.png, image-2022-09-06-18-10-11-848.png, 
> image-2022-09-06-18-11-02-915.png
>
>
> !image-2022-09-02-15-35-18-590.png|width=574,height=306!
>  
> when i use a long time window while using bounded stream in STREAMING 
> mode,the job shutdown before the window has been triggered and then i have 
> nothing to print.
> if i use BATCH mode with the same code,  it will work. But the batch shuffle 
> is so expensive so i still prefer STREAMING. 
> Is there any method could fix this problem? Or should i have mistake in my 
> code?
> Looking forward to your reply.  Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29183) flink job ended before window time up while using bounded stream source

2022-10-20 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620855#comment-17620855
 ] 

Yun Gao commented on FLINK-29183:
-

Hi [~landlord] , this is indeed a known issue that currently Flink will ignores 
all the register times at the end of bounded stream. Recently we have some 
discussion on this issue under 
https://issues.apache.org/jira/browse/FLINK-18647.   

We are planning to allow users to specify how to deal with the timers at the 
end of streams. I'll open a FLIP and discussion for that soon. 

> flink job ended before window time up while using bounded stream source
> ---
>
> Key: FLINK-29183
> URL: https://issues.apache.org/jira/browse/FLINK-29183
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.2
>Reporter: Lu
>Priority: Major
> Attachments: image-2022-09-02-15-33-08-459.png, 
> image-2022-09-02-15-35-18-590.png, image-2022-09-06-18-09-40-132.png, 
> image-2022-09-06-18-09-43-745.png, image-2022-09-06-18-10-11-848.png, 
> image-2022-09-06-18-11-02-915.png
>
>
> !image-2022-09-02-15-35-18-590.png|width=574,height=306!
>  
> when i use a long time window while using bounded stream in STREAMING 
> mode,the job shutdown before the window has been triggered and then i have 
> nothing to print.
> if i use BATCH mode with the same code,  it will work. But the batch shuffle 
> is so expensive so i still prefer STREAMING. 
> Is there any method could fix this problem? Or should i have mistake in my 
> code?
> Looking forward to your reply.  Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29610) Infinite timeout is used in SavepointHandlers calls to RestfulGateway

2022-10-20 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620844#comment-17620844
 ] 

Yun Gao commented on FLINK-29610:
-

Thanks [~Jiale] for creating the issue. Since it seems the handler only submits 
a request to the timer thread of the CheckpointCoordinator, it looks to me that 
we might also use ASK_TIMEOUT_DURATION for these rpc calls. 

> Infinite timeout is used in SavepointHandlers calls to RestfulGateway
> -
>
> Key: FLINK-29610
> URL: https://issues.apache.org/jira/browse/FLINK-29610
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Jiale Tan
>Priority: Major
>
> In {{{}SavepointHandlers{}}}, both 
> {{[StopWithSavepointHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L214]}}
>  and 
> {{[SavepointTriggerHandler|https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L258]}}
>  are calling {{RestfulGateway}} with {{RpcUtils.INF_TIMEOUT}}
>  
> As pointed out in 
> [this|https://github.com/apache/flink/pull/20852#discussion_r992218970] 
> discussion, we will need to either figure out why {{RpcUtils.INF_TIMEOUT}} is 
> used, or remove it if there is no strong reason to use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29498) Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API

2022-10-19 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620102#comment-17620102
 ] 

Yun Gao commented on FLINK-29498:
-

Thanks [~eric.xiao] for the efforts! I also think we could first add the 
improvement since the Scala API would still be exists for several versions 
after deprecated. I have assigned the issue to you. 

> Flink Async I/O Retry Strategies Do Not Work for Scala AsyncDataStream API
> --
>
> Key: FLINK-29498
> URL: https://issues.apache.org/jira/browse/FLINK-29498
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala
>Affects Versions: 1.15.2
>Reporter: Eric Xiao
>Assignee: Eric Xiao
>Priority: Minor
>
> We are using the async I/O to make HTTP calls and one of the features we 
> wanted to leverage was the retries, so we pulled the newest commit: 
> [http://github.com/apache/flink/pull/19983] into our internal Flink fork.
> When I try calling the function {{AsyncDataStream.unorderedWaitWithRetry}} 
> from the scala API I with a retry strategy from the java API I get an error 
> as {{unorderedWaitWithRetry}} expects a scala retry strategy. The problem is 
> that retry strategies were only implemented in java and not Scala in this PR: 
> [http://github.com/apache/flink/pull/19983].
>  
> Here is some of the code to reproduce the error:
> {code:java}
> import org.apache.flink.streaming.api.scala.AsyncDataStream
> import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => 
> JAsyncRetryStrategies}
> val javaAsyncRetryStrategy = new 
> JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 100L)
> .build()
> val data = AsyncDataStream.unorderedWaitWithRetry(
>   source,
>   asyncOperator,
>   pipelineTimeoutInMs,
>   TimeUnit.MILLISECONDS,
>   javaAsyncRetryStrategy
> ){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >