[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

2020-12-18 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-20618:
---
Affects Version/s: (was: 1.10.2)
   (was: 1.11.0)
   1.10.0
   1.11.1

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.10.0, 1.11.1
>Reporter: zlzhang0122
>Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, 
> 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png, stuck_node.txt, 
> stuck_node_downstream.txt
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> - SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) - Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition 
> [0x7f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for 0xdb234488 (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> 

[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

2020-12-17 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-20618:

Attachment: stuck_node_downstream.txt

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.11.0, 1.10.2
>Reporter: zlzhang0122
>Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, 
> 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png, stuck_node.txt, 
> stuck_node_downstream.txt
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> - SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) - Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition 
> [0x7f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for 0xdb234488 (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> 

[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

2020-12-17 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-20618:

Attachment: stuck_node.txt

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.11.0, 1.10.2
>Reporter: zlzhang0122
>Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, 
> 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png, stuck_node.txt, 
> stuck_node_downstream.txt
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> - SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) - Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition 
> [0x7f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for 0xdb234488 (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> 

[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

2020-12-17 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-20618:

Attachment: 2020-12-17 16-45-00 的屏幕截图.png

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.11.0, 1.10.2
>Reporter: zlzhang0122
>Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, 
> 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> - SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) - Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition 
> [0x7f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for 0xdb234488 (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> 

[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

2020-12-16 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-20618:

Attachment: (was: 2020-12-17 10-51-42 的屏幕截图.png)

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.11.0, 1.10.2
>Reporter: zlzhang0122
>Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, 
> 2020-12-17 11-10-06 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> - SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) - Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition 
> [0x7f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for 0xdb234488 (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> 

[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

2020-12-16 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-20618:

Attachment: 2020-12-17 11-10-06 的屏幕截图.png

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.11.0, 1.10.2
>Reporter: zlzhang0122
>Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, 
> 2020-12-17 11-10-06 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> - SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) - Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition 
> [0x7f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for 0xdb234488 (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> 

[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

2020-12-16 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-20618:

Attachment: 2020-12-17 10-51-42 的屏幕截图.png

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.11.0, 1.10.2
>Reporter: zlzhang0122
>Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, 
> 2020-12-17 10-51-42 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> - SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) - Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition 
> [0x7f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for 0xdb234488 (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> 

[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

2020-12-16 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-20618:
-
Component/s: (was: API / Core)
 Runtime / Network

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.11.0, 1.10.2
>Reporter: zlzhang0122
>Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> - SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) - Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition 
> [0x7f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for 0xdb234488 (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> 

[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

2020-12-15 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-20618:

Attachment: 2020-12-16 11-53-42 的屏幕截图.png
2020-12-16 11-49-01 的屏幕截图.png

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.11.0, 1.10.2
>Reporter: zlzhang0122
>Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> - SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) - Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition 
> [0x7f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for 0xdb234488 (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> 

[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

2020-12-15 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-20618:

Attachment: 2020-12-16 11-48-30 的屏幕截图.png
2020-12-16 11-47-37 的屏幕截图.png

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---
>
> Key: FLINK-20618
> URL: https://issues.apache.org/jira/browse/FLINK-20618
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.11.0, 1.10.2
>Reporter: zlzhang0122
>Priority: Critical
> Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> - SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) - Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition 
> [0x7f43b8488000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for 0xdb234488 (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at StreamExecCalc$33.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at 
>