[jira] [Commented] (FLINK-13247) Implement external shuffle service for YARN

2021-04-20 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325754#comment-17325754
 ] 

Zhijiang commented on FLINK-13247:
--

[~wind_ljy] Glad to hear that you have implemented this feature in your company 
and desired to contriute it. I believe it would be a good enhancement to Flink 
batch jobs. But I guess most of the committers are pretty busy for the current 
release work ATM, so it's better to prepare your detail design and then the 
community can judge whether it would be covered in the next release cycle.

> Implement external shuffle service for YARN
> ---
>
> Key: FLINK-13247
> URL: https://issues.apache.org/jira/browse/FLINK-13247
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Network
>Reporter: MalcolmSanders
>Priority: Major
>
> Flink batch job users could achieve better cluster utilization and job 
> throughput throught external shuffle service because the producers of 
> intermedia result partitions can be released once intermedia result 
> partitions have been persisted on disks. In 
> [FLINK-10653|https://issues.apache.org/jira/browse/FLINK-10653], [~zjwang] 
> has introduced pluggable shuffle manager architecture which abstracts the 
> process of data transfer between stages from flink runtime as shuffle 
> service. I propose to YARN implementation for flink external shuffle service 
> since YARN is widely used in various companies.
> The basic idea is as follows:
> (1) Producers write intermedia result partitions to local disks assigned by 
> NodeManager;
> (2) Yarn shuffle servers, deployed on each NodeManager as an auxiliary 
> service, are acknowledged of intermedia result partition descriptions by 
> producers;
> (3) Consumers fetch intermedia result partition from yarn shuffle servers;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21858) TaskMetricGroup taskName is too long, especially in sql tasks.

2021-03-18 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304031#comment-17304031
 ] 

Zhijiang commented on FLINK-21858:
--

I have assigned it to you, but it is better to confirm the solution with 
[~chesnay] before submitting the PR. :)

> TaskMetricGroup taskName is too long, especially in sql tasks.
> --
>
> Key: FLINK-21858
> URL: https://issues.apache.org/jira/browse/FLINK-21858
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
>Reporter: Andrew.D.lin
>Assignee: Andrew.D.lin
>Priority: Major
>
> Now operatorName is limited to 80 by 
> org.apache.flink.runtime.metrics.groups.TaskMetricGroup#METRICS_OPERATOR_NAME_MAX_LENGTH.
> So propose to limit the maximum length of metric name by configuration.
>  
> Here is an example:
>  
> "taskName":"GlobalGroupAggregate(groupBy=[dt, src, src1, src2, src3, ct1, 
> ct2], select=[dt, src, src1, src2, src3, ct1, ct2, SUM_RETRACT((sum$0, 
> count$1)) AS sx_pv, SUM_RETRACT((sum$2, count$3)) AS sx_uv, 
> MAX_RETRACT(max$4) AS updt_time, MAX_RETRACT(max$5) AS time_id]) -> 
> Calc(select=[((MD5((dt CONCAT _UTF-16LE'|' CONCAT src CONCAT _UTF-16LE'|' 
> CONCAT src1 CONCAT _UTF-16LE'|' CONCAT src2 CONCAT _UTF-16LE'|' CONCAT src3 
> CONCAT _UTF-16LE'|' CONCAT ct1 CONCAT _UTF-16LE'|' CONCAT ct2 CONCAT 
> _UTF-16LE'|' CONCAT time_id)) SUBSTR 1 SUBSTR 2) CONCAT _UTF-16LE'_' CONCAT 
> (dt CONCAT _UTF-16LE'|' CONCAT src CONCAT _UTF-16LE'|' CONCAT src1 CONCAT 
> _UTF-16LE'|' CONCAT src2 CONCAT _UTF-16LE'|' CONCAT src3 CONCAT _UTF-16LE'|' 
> CONCAT ct1 CONCAT _UTF-16LE'|' CONCAT ct2 CONCAT _UTF-16LE'|' CONCAT 
> time_id)) AS rowkey, sx_pv, sx_uv, updt_time]) -> 
> LocalGroupAggregate(groupBy=[rowkey], select=[rowkey, MAX_RETRACT(sx_pv) AS 
> max$0, MAX_RETRACT(sx_uv) AS max$1, MAX_RETRACT(updt_time) AS max$2, 
> COUNT_RETRACT(*) AS count1$3])"
> "operatorName":"GlobalGroupAggregate(groupBy=[dt, src, src1, src2, src3, ct1, 
> ct2], selec=[dt, s"
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-21858) TaskMetricGroup taskName is too long, especially in sql tasks.

2021-03-18 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-21858:


Assignee: Andrew.D.lin

> TaskMetricGroup taskName is too long, especially in sql tasks.
> --
>
> Key: FLINK-21858
> URL: https://issues.apache.org/jira/browse/FLINK-21858
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
>Reporter: Andrew.D.lin
>Assignee: Andrew.D.lin
>Priority: Major
>
> Now operatorName is limited to 80 by 
> org.apache.flink.runtime.metrics.groups.TaskMetricGroup#METRICS_OPERATOR_NAME_MAX_LENGTH.
> So propose to limit the maximum length of metric name by configuration.
>  
> Here is an example:
>  
> "taskName":"GlobalGroupAggregate(groupBy=[dt, src, src1, src2, src3, ct1, 
> ct2], select=[dt, src, src1, src2, src3, ct1, ct2, SUM_RETRACT((sum$0, 
> count$1)) AS sx_pv, SUM_RETRACT((sum$2, count$3)) AS sx_uv, 
> MAX_RETRACT(max$4) AS updt_time, MAX_RETRACT(max$5) AS time_id]) -> 
> Calc(select=[((MD5((dt CONCAT _UTF-16LE'|' CONCAT src CONCAT _UTF-16LE'|' 
> CONCAT src1 CONCAT _UTF-16LE'|' CONCAT src2 CONCAT _UTF-16LE'|' CONCAT src3 
> CONCAT _UTF-16LE'|' CONCAT ct1 CONCAT _UTF-16LE'|' CONCAT ct2 CONCAT 
> _UTF-16LE'|' CONCAT time_id)) SUBSTR 1 SUBSTR 2) CONCAT _UTF-16LE'_' CONCAT 
> (dt CONCAT _UTF-16LE'|' CONCAT src CONCAT _UTF-16LE'|' CONCAT src1 CONCAT 
> _UTF-16LE'|' CONCAT src2 CONCAT _UTF-16LE'|' CONCAT src3 CONCAT _UTF-16LE'|' 
> CONCAT ct1 CONCAT _UTF-16LE'|' CONCAT ct2 CONCAT _UTF-16LE'|' CONCAT 
> time_id)) AS rowkey, sx_pv, sx_uv, updt_time]) -> 
> LocalGroupAggregate(groupBy=[rowkey], select=[rowkey, MAX_RETRACT(sx_pv) AS 
> max$0, MAX_RETRACT(sx_uv) AS max$1, MAX_RETRACT(updt_time) AS max$2, 
> COUNT_RETRACT(*) AS count1$3])"
> "operatorName":"GlobalGroupAggregate(groupBy=[dt, src, src1, src2, src3, ct1, 
> ct2], selec=[dt, s"
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13247) Implement external shuffle service for YARN

2021-01-19 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268352#comment-17268352
 ] 

Zhijiang commented on FLINK-13247:
--

Thanks [~trohrmann] for the operation. Actually I tried to do that while last 
commenting, but failed unexpectedly :(

> Implement external shuffle service for YARN
> ---
>
> Key: FLINK-13247
> URL: https://issues.apache.org/jira/browse/FLINK-13247
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Network
>Reporter: MalcolmSanders
>Priority: Minor
>
> Flink batch job users could achieve better cluster utilization and job 
> throughput throught external shuffle service because the producers of 
> intermedia result partitions can be released once intermedia result 
> partitions have been persisted on disks. In 
> [FLINK-10653|https://issues.apache.org/jira/browse/FLINK-10653], [~zjwang] 
> has introduced pluggable shuffle manager architecture which abstracts the 
> process of data transfer between stages from flink runtime as shuffle 
> service. I propose to YARN implementation for flink external shuffle service 
> since YARN is widely used in various companies.
> The basic idea is as follows:
> (1) Producers write intermedia result partitions to local disks assigned by 
> NodeManager;
> (2) Yarn shuffle servers, deployed on each NodeManager as an auxiliary 
> service, are acknowledged of intermedia result partition descriptions by 
> producers;
> (3) Consumers fetch intermedia result partition from yarn shuffle servers;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13247) Implement external shuffle service for YARN

2021-01-17 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267020#comment-17267020
 ] 

Zhijiang commented on FLINK-13247:
--

AFAIK, the current assignee [~ssy] will not work on it any more since he has 
already transferred to a new work position.  But I am not sure whether there 
are other candidates to take over this issue in the future plan.

> Implement external shuffle service for YARN
> ---
>
> Key: FLINK-13247
> URL: https://issues.apache.org/jira/browse/FLINK-13247
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Network
>Reporter: MalcolmSanders
>Assignee: MalcolmSanders
>Priority: Minor
>
> Flink batch job users could achieve better cluster utilization and job 
> throughput throught external shuffle service because the producers of 
> intermedia result partitions can be released once intermedia result 
> partitions have been persisted on disks. In 
> [FLINK-10653|https://issues.apache.org/jira/browse/FLINK-10653], [~zjwang] 
> has introduced pluggable shuffle manager architecture which abstracts the 
> process of data transfer between stages from flink runtime as shuffle 
> service. I propose to YARN implementation for flink external shuffle service 
> since YARN is widely used in various companies.
> The basic idea is as follows:
> (1) Producers write intermedia result partitions to local disks assigned by 
> NodeManager;
> (2) Yarn shuffle servers, deployed on each NodeManager as an auxiliary 
> service, are acknowledged of intermedia result partition descriptions by 
> producers;
> (3) Consumers fetch intermedia result partition from yarn shuffle servers;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17200) Add connectors to ClickHouse

2021-01-05 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17259459#comment-17259459
 ] 

Zhijiang commented on FLINK-17200:
--

[~csbliss], I saw your topic in last year's FFA of sinking into Clickhouse 
based on Flink. AIK, Clickhouse is really very popular as OLAP in China's 
companies, so this contribution would be very helpful I guess. Then what is 
your future plan for this contribution as the above suggested way?

> Add connectors to ClickHouse
> 
>
> Key: FLINK-17200
> URL: https://issues.apache.org/jira/browse/FLINK-17200
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: jinhai
>Priority: Major
>
> Clickhouse is a powerful OLAP query engine and supports real-time data mini 
> batch writing.
> We can add flink connectors to ClickHouse
> [weisite: https://clickhouse.tech/|https://clickhouse.tech/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20743) Print ContainerId For RemoteTransportException

2020-12-29 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255906#comment-17255906
 ] 

Zhijiang commented on FLINK-20743:
--

already assigned.

> Print ContainerId For RemoteTransportException
> --
>
> Key: FLINK-20743
> URL: https://issues.apache.org/jira/browse/FLINK-20743
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.10.0, 1.11.1, 1.12.1
>Reporter: yang gang
>Assignee: yang gang
>Priority: Major
> Attachments: image-2020-12-23-15-13-21-226.png
>
>
> !image-2020-12-23-15-13-21-226.png|width=970,height=291!
>  RemoteTransportException, this exception reminds the user which service has 
> a problem by means of Ip/Port.
>  When we troubleshoot the problem, the information is not accurate enough. 
> Usually at this time we need to look at the running log of the container that 
> has the problem, but when we see this log, it also shows that the container 
> has died, so pass Ip/ The port method can no longer quickly locate a specific 
> container.
>  So I hope that when such an exception occurs, I hope to print the 
> containerId。
> E.g:
>  Connection unexpectedly closed by remote task manager 
> 'hostName/ip:port/containerId'. This might indicate that the remote task 
> manager was lost.
>   
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-20743) Print ContainerId For RemoteTransportException

2020-12-29 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-20743:


Assignee: yang gang

> Print ContainerId For RemoteTransportException
> --
>
> Key: FLINK-20743
> URL: https://issues.apache.org/jira/browse/FLINK-20743
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.10.0, 1.11.1, 1.12.1
>Reporter: yang gang
>Assignee: yang gang
>Priority: Major
> Attachments: image-2020-12-23-15-13-21-226.png
>
>
> !image-2020-12-23-15-13-21-226.png|width=970,height=291!
>  RemoteTransportException, this exception reminds the user which service has 
> a problem by means of Ip/Port.
>  When we troubleshoot the problem, the information is not accurate enough. 
> Usually at this time we need to look at the running log of the container that 
> has the problem, but when we see this log, it also shows that the container 
> has died, so pass Ip/ The port method can no longer quickly locate a specific 
> container.
>  So I hope that when such an exception occurs, I hope to print the 
> containerId。
> E.g:
>  Connection unexpectedly closed by remote task manager 
> 'hostName/ip:port/containerId'. This might indicate that the remote task 
> manager was lost.
>   
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20743) Print ContainerId For RemoteTransportException

2020-12-25 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17254795#comment-17254795
 ] 

Zhijiang commented on FLINK-20743:
--

Thanks for creating this issue [~清月]. 
I remembered I also encountered the same concern while debugging some failover 
issues before. I guess we might rely on the  port info to trace the other 
required infos such as container ID from job manager log. But indeed it is not 
very convinenet and efficient. So in general I supoort your proposal for 
improving the debugging process, but I am a bit worried that it might not be 
easy to pass container ID into the network stack  which might touch many 
components.

Anyway, you can try out your way and I can assign this ticket to you if you 
desire to contribute it. :) 

> Print ContainerId For RemoteTransportException
> --
>
> Key: FLINK-20743
> URL: https://issues.apache.org/jira/browse/FLINK-20743
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.10.0, 1.11.1, 1.12.1
>Reporter: yang gang
>Priority: Major
> Attachments: image-2020-12-23-15-13-21-226.png
>
>
> !image-2020-12-23-15-13-21-226.png|width=970,height=291!
>  RemoteTransportException, this exception reminds the user which service has 
> a problem by means of Ip/Port.
>  When we troubleshoot the problem, the information is not accurate enough. 
> Usually at this time we need to look at the running log of the container that 
> has the problem, but when we see this log, it also shows that the container 
> has died, so pass Ip/ The port method can no longer quickly locate a specific 
> container.
>  So I hope that when such an exception occurs, I hope to print the 
> containerId。
> E.g:
>  Connection unexpectedly closed by remote task manager 
> 'hostName/ip:port/containerId'. This might indicate that the remote task 
> manager was lost.
>   
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16753) Exception from AsyncCheckpointRunnable should be wrapped in CheckpointException

2020-11-13 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17231688#comment-17231688
 ] 

Zhijiang commented on FLINK-16753:
--

Thanks for the reminder [~NicoK].

I ever considered this concern while merging the commit.  As it is not a bug 
and this improvement seems not such sensitive on my side, then I only merged it 
to master then.

But I am also fine with backporting it to the previous versions if you consider 
it meaningful. :)

What is your opinion [~wind_ljy]? I am not quite sure the code path diff in the 
early versions. So could you submit the separate PR for these versions if 
finally decide to backport this improvement?

> Exception from AsyncCheckpointRunnable should be wrapped in 
> CheckpointException
> ---
>
> Key: FLINK-16753
> URL: https://issues.apache.org/jira/browse/FLINK-16753
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0, 1.11.0, 1.11.1
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> If an exception is thrown from task's async checkpoint process, the 
> checkpoint will be declined as expected, but the reason for declining 
> checkpoint will be regarded as {{CheckpointFailureReason.JOB_FAILURE}}, which 
> gives a wrong message to users.
> I think we can simply replace
> {code:java}
> owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(),
>  checkpointException);
> {code}
> with
>  
> {code:java}
> owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(),
>  new CheckpointException(CheckpointFailureReason.EXCEPTION, 
> checkpointException));
> {code}
> in {{AsyncCheckpointRunnable.handleExecutionException}}.
> cc [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19907) Channel state (upstream) can be restored after emission of new elements (watermarks)

2020-11-02 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17224507#comment-17224507
 ] 

Zhijiang commented on FLINK-19907:
--

I am not quite sure the actual semantic for the new emitted elements while 
initializing operator state here. But I think we should consider two issues for 
guaranteeing the precision.

* Determinacy for repeated restore: That means the behavior should be 
consistent while executing the state restore multiple times.
* Consistency with normal running: Assume the new emitted elements also exist 
while state snapshot, what is the sequence between them and in-flight channel 
state, then we should also obey the same sequence after restoring.

> Channel state (upstream) can be restored after emission of new elements 
> (watermarks)
> 
>
> Key: FLINK-19907
> URL: https://issues.apache.org/jira/browse/FLINK-19907
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.12.0, 1.11.2
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
> Fix For: 1.12.0
>
>
> In StreamTask.beforeInvoke:
> 1. 
> operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
> 2. readRecoveredChannelState();
>  But operatorChain.initializeStateAndOpenOperators can emit watermarks (or 
> potentially some other stream elements).
>  I've encountered this issue while adding an EndOfRecovery marker - in some 
> runs of in OverWindowITCase.testRowTimeBoundedPartitionedRangeOver the marker 
> was emitted after the watermark.
>  
> cc: [~zjwang], [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-19745) Supplement micro-benchmark for bounded blocking partition in remote channel case

2020-10-21 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-19745.

Resolution: Fixed

Merged in master: 333eb8ebf116756911dc7e1da59489b5cd9ea475

> Supplement micro-benchmark for bounded blocking partition in remote channel 
> case
> 
>
> Key: FLINK-19745
> URL: https://issues.apache.org/jira/browse/FLINK-19745
> Project: Flink
>  Issue Type: Task
>  Components: Benchmarks, Runtime / Network
>Reporter: Zhijiang
>Assignee: Zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> The current benchmark `BlockingPartitionBenchmark` for batch job only 
> measures the scenario of producer & consumer deployment in the same 
> processor, that corresponds to the local input channel on consumer side. 
> We want to supplement another common scenario to measure the effect of 
> reading data via network shuffle, which corresponds to the remote input 
> channel on consumer side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19745) Supplement micro-benchmark for bounded blocking partition in remote channel case

2020-10-20 Thread Zhijiang (Jira)
Zhijiang created FLINK-19745:


 Summary: Supplement micro-benchmark for bounded blocking partition 
in remote channel case
 Key: FLINK-19745
 URL: https://issues.apache.org/jira/browse/FLINK-19745
 Project: Flink
  Issue Type: Task
  Components: Benchmarks, Runtime / Network
Reporter: Zhijiang
Assignee: Zhijiang


The current benchmark `BlockingPartitionBenchmark` for batch job only measures 
the scenario of producer & consumer deployment in the same processor, that 
corresponds to the local input channel on consumer side. 

We want to supplement another common scenario to measure the effect of reading 
data via network shuffle, which corresponds to the remote input channel on 
consumer side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16051) Subtask ID in Overview-Subtasks should start from 1

2020-10-20 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-16051.

Resolution: Duplicate

> Subtask ID in Overview-Subtasks should start from 1
> ---
>
> Key: FLINK-16051
> URL: https://issues.apache.org/jira/browse/FLINK-16051
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Minor
>  Labels: pull-request-available
> Attachments: backpressureui.png, checkpointui.png, taskui.png, 
> watermarkui.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The subtask id in Subtask UI starts from 0 which is not consistent with other 
> ID in backpressure / checkpoint / watermark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-7315) use flink's buffers in netty

2020-10-20 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-7315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-7315.
---
Resolution: Fixed

> use flink's buffers in netty
> 
>
> Key: FLINK-7315
> URL: https://issues.apache.org/jira/browse/FLINK-7315
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> The goal of this change is to avoid the step in the channel encoder and 
> decoder pipelines where flink buffers are copied into netty buffers. Instead, 
> netty should directly send flink buffers to the network.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-12909) add try catch when find a unique file name for the spilling channel

2020-10-20 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-12909.

Resolution: Later

I will close this issue for cleanup since the contributor was not active for 
long time and no one would take over it ATM. We can reopen it again if really 
necessary future. 

> add try catch when find a unique file name for the spilling channel
> ---
>
> Key: FLINK-12909
> URL: https://issues.apache.org/jira/browse/FLINK-12909
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: xymaqingxiang
>Priority: Major
>  Labels: pull-request-available
>
> h2. What is the purpose of the change
> Catch exceptions thrown due to disk loss, try to find a unique file name for 
> the spilling channel again.
> Modify the createSpillingChannel() method of the 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer
>  class to solve this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16051) Subtask ID in Overview-Subtasks should start from 1

2020-10-20 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217345#comment-17217345
 ] 

Zhijiang commented on FLINK-16051:
--

[~wind_ljy], do you think we can close this issue for cleanup now, since 
[~chesnay] proposed some concerns in the respective PR before.

> Subtask ID in Overview-Subtasks should start from 1
> ---
>
> Key: FLINK-16051
> URL: https://issues.apache.org/jira/browse/FLINK-16051
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Minor
>  Labels: pull-request-available
> Attachments: backpressureui.png, checkpointui.png, taskui.png, 
> watermarkui.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The subtask id in Subtask UI starts from 0 which is not consistent with other 
> ID in backpressure / checkpoint / watermark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2020-10-20 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217343#comment-17217343
 ] 

Zhijiang edited comment on FLINK-19249 at 10/20/20, 6:50 AM:
-

I dig out the previously discussed issues 
[FLINK-16030|https://issues.apache.org/jira/browse/FLINK-16030] which might 
have the same direction with it.


was (Author: zjwang):
I dig out the previously discussed issues 
[FLINK-16030|https://issues.apache.org/jira/browse/FLINK-16030]which might have 
the same direction with it.

> Detect broken connections in case TCP Timeout takes too long.
> -
>
> Key: FLINK-19249
> URL: https://issues.apache.org/jira/browse/FLINK-19249
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> {quote}encountered this error on 1.7, after going through the master code, I 
> think the problem is still there
> {quote}
> When the network environment is not so good, the connection between the 
> server and the client may be disconnected innocently. After the 
> disconnection, the server will receive the IOException such as below
> {code:java}
> java.io.IOException: Connection timed out
>  at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>  at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>  at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>  at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>  at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>  at java.lang.Thread.run(Thread.java:748)
> {code}
> then release the view reader.
> But the job would not fail until the downstream detect the disconnection 
> because of {{channelInactive}} later(~10 min). between such time, the job can 
> still process data, but the broken channel can't transfer any data or event, 
> so snapshot would fail during this time. this will cause the job to replay 
> many data after failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2020-10-20 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217343#comment-17217343
 ] 

Zhijiang commented on FLINK-19249:
--

I dig out the previously discussed issues 
[FLINK-16030|https://issues.apache.org/jira/browse/FLINK-16030]which might have 
the same direction with it.

> Detect broken connections in case TCP Timeout takes too long.
> -
>
> Key: FLINK-19249
> URL: https://issues.apache.org/jira/browse/FLINK-19249
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> {quote}encountered this error on 1.7, after going through the master code, I 
> think the problem is still there
> {quote}
> When the network environment is not so good, the connection between the 
> server and the client may be disconnected innocently. After the 
> disconnection, the server will receive the IOException such as below
> {code:java}
> java.io.IOException: Connection timed out
>  at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>  at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>  at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>  at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>  at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>  at java.lang.Thread.run(Thread.java:748)
> {code}
> then release the view reader.
> But the job would not fail until the downstream detect the disconnection 
> because of {{channelInactive}} later(~10 min). between such time, the job can 
> still process data, but the broken channel can't transfer any data or event, 
> so snapshot would fail during this time. this will cause the job to replay 
> many data after failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13802) Flink code style guide

2020-10-20 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217340#comment-17217340
 ] 

Zhijiang commented on FLINK-13802:
--

[~azagrebin]. Just remind do you think this umbrella ticket should be closed, :)

> Flink code style guide
> --
>
> Key: FLINK-13802
> URL: https://issues.apache.org/jira/browse/FLINK-13802
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Project Website
>Reporter: Andrey Zagrebin
>Priority: Major
>  Labels: codestyle
>
> This is an umbrella issue to introduce and improve Flink code style guide.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14525) buffer pool is destroyed

2020-10-20 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-14525.

Resolution: Won't Fix

> buffer pool is destroyed
> 
>
> Key: FLINK-14525
> URL: https://issues.apache.org/jira/browse/FLINK-14525
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2
>Reporter: Saqib
>Priority: Major
>
> Have a flink app running in standalone mode. The app runs ok in our non-prod 
> env. However on our prod server it throws this exception:
> Buffer pool is destroyed. 
>  
> This error is being thrown as a RuntimeException on the collect call, on the 
> flatmap function. The flatmap is just collecting a Tuple, 
> the Document is a XML Document object.
>  
> As mentioned the non prod env  (and we have multiple, DEV,QA,UAT) this is not 
> happening. The UAT box is spec-ed exactly as our Prod host with 4CPU. The 
> java version is the same too.
>  
> Not sure how to proceed.
>  
> Thanks
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14525) buffer pool is destroyed

2020-10-20 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217338#comment-17217338
 ] 

Zhijiang commented on FLINK-14525:
--

Close this issue for cleanup, since the reporter was not responsive for long 
time and the affected version is out of date for maintaining.

> buffer pool is destroyed
> 
>
> Key: FLINK-14525
> URL: https://issues.apache.org/jira/browse/FLINK-14525
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2
>Reporter: Saqib
>Priority: Major
>
> Have a flink app running in standalone mode. The app runs ok in our non-prod 
> env. However on our prod server it throws this exception:
> Buffer pool is destroyed. 
>  
> This error is being thrown as a RuntimeException on the collect call, on the 
> flatmap function. The flatmap is just collecting a Tuple, 
> the Document is a XML Document object.
>  
> As mentioned the non prod env  (and we have multiple, DEV,QA,UAT) this is not 
> happening. The UAT box is spec-ed exactly as our Prod host with 4CPU. The 
> java version is the same too.
>  
> Not sure how to proceed.
>  
> Thanks
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15981) Control the direct memory in FileChannelBoundedData.FileBufferReader

2020-10-20 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-15981:
-
Fix Version/s: 1.12.0

> Control the direct memory in FileChannelBoundedData.FileBufferReader
> 
>
> Key: FLINK-15981
> URL: https://issues.apache.org/jira/browse/FLINK-15981
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1
>Reporter: Jingsong Lee
>Assignee: Zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Now, the default blocking BoundedData is FileChannelBoundedData. In its 
> reader, will create new direct buffer 64KB.
> When parallelism greater than 100, users need configure 
> "taskmanager.memory.task.off-heap.size" to avoid direct memory OOM. It is 
> hard to configure, and it cost a lot of memory. Consider 1000 parallelism, 
> maybe we need 1GB+ for a task manager.
> This is not conducive to the scenario of less slots and large parallelism. 
> Batch jobs could run little by little, but memory shortage would consume a 
> lot.
> If we provided N-Input operators, maybe things will be worse. This means the 
> number of subpartitions that can be requested at the same time will be more. 
> We have no idea how much memory.
> Here are my rough thoughts:
>  * Obtain memory from network buffers.
>  * provide "The maximum number of subpartitions that can be requested at the 
> same time".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15981) Control the direct memory in FileChannelBoundedData.FileBufferReader

2020-10-20 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-15981:


Assignee: Zhijiang

> Control the direct memory in FileChannelBoundedData.FileBufferReader
> 
>
> Key: FLINK-15981
> URL: https://issues.apache.org/jira/browse/FLINK-15981
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1
>Reporter: Jingsong Lee
>Assignee: Zhijiang
>Priority: Critical
>  Labels: pull-request-available
>
> Now, the default blocking BoundedData is FileChannelBoundedData. In its 
> reader, will create new direct buffer 64KB.
> When parallelism greater than 100, users need configure 
> "taskmanager.memory.task.off-heap.size" to avoid direct memory OOM. It is 
> hard to configure, and it cost a lot of memory. Consider 1000 parallelism, 
> maybe we need 1GB+ for a task manager.
> This is not conducive to the scenario of less slots and large parallelism. 
> Batch jobs could run little by little, but memory shortage would consume a 
> lot.
> If we provided N-Input operators, maybe things will be worse. This means the 
> number of subpartitions that can be requested at the same time will be more. 
> We have no idea how much memory.
> Here are my rough thoughts:
>  * Obtain memory from network buffers.
>  * provide "The maximum number of subpartitions that can be requested at the 
> same time".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18427) Job failed under java 11

2020-10-20 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217331#comment-17217331
 ] 

Zhijiang commented on FLINK-18427:
--

[~xintongsong] might be more familiar with the concerns of memory setting 
[~simahao] proposed above.

> Job failed under java 11
> 
>
> Key: FLINK-18427
> URL: https://issues.apache.org/jira/browse/FLINK-18427
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Runtime / Network
>Affects Versions: 1.10.0
>Reporter: Zhang Hao
>Priority: Critical
> Attachments: image-2020-06-29-13-49-17-756.png
>
>
> flink version:1.10.0
> deployment mode:cluster
> os:linux redhat7.5
> Job parallelism:greater than 1
> My job run normally under java 8, but failed under java 11.Excpetion info 
> like below,netty send message failed.In addition, I found job would failed 
> when task was distributed on multi node, if I set job's parallelism = 1, job 
> run normally under java 11 too.
>  
> 2020-06-24 09:52:162020-06-24 
> 09:52:16org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>  Sending the partition request to '/170.0.50.19:33320' failed. at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:124)
>  at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:115)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:474)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.notifyOutboundHandlerException(AbstractChannelHandlerContext.java:818)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:718)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:708)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.access$1700(AbstractChannelHandlerContext.java:56)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1102)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1149)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1073)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> java.io.IOException: Error while serializing message: 
> PartitionRequest(8059a0b47f7ba0ff814ea52427c584e7@6750c1170c861176ad3ceefe9b02f36e:0:2)
>  at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:177)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:716)
>  ... 11 moreCaused by: java.io.IOException: java.lang.OutOfMemoryError: 
> Direct buffer memory at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:497)
>  at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:174)
>  ... 12 moreCaused by: java.lang.OutOfMemoryError: Direct buffer memory at 
> 

[jira] [Comment Edited] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216520#comment-17216520
 ] 

Zhijiang edited comment on FLINK-19596 at 10/19/20, 8:04 AM:
-

I heard of this issue proposed long time ago from [~yunta], maybe he knew some 
other backgrounds.


was (Author: zjwang):
I heard of this issue proposed long time ago from [~tangyun], maybe he knew 
some other backgrounds.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216520#comment-17216520
 ] 

Zhijiang commented on FLINK-19596:
--

I heard of this issue proposed long time ago from [~tangyun], maybe he knew 
some other backgrounds.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216517#comment-17216517
 ] 

Zhijiang commented on FLINK-19596:
--

Thanks for proposing this potential improvement [~wind_ljy]!

+1 for the motivation. 

If I understood correctly, the 
`ZooKeeperCompletedCheckpointStore#completedCheckpoints`  is not loaded (lazy 
loading) during class construction, and it would be updated in append way 
during follow up `CheckpointCoordinator#completePendingCheckpoint`. 

Once `CheckpointCoordinator#restoreLatestCheckpointedStateInternal, it lazy 
loads all the completed checkpoints via `CompletedCheckpointStore#recover`.  So 
it has some duplicate overhead here to read already completed checkpoint twice.

Regarding the solution, I am not quite clear whether we can couple this 
decision(CompletedCheckpointStore#recover) with global/local/regional failover. 
E.g. JM leader change will cause global failover now, but if we improve it 
future, it might not need job restart via reconciling.  

If we can refactor to make 
`completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery)` 
cover the internal logic of `completedCheckpointStore.recover()`, it might seem 
more make sense. I mean we only care about to get latest checkpoint via 
`#getLatestCheckpoint` interface, and in its internal implementations it can 
judge whether further need to call `#recover` to load previously completed 
checkpoint or not.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-19155) ResultPartitionTest is unstable

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-19155.

Resolution: Won't Fix

Close it since this ut has already been removed in another refactoring work.

> ResultPartitionTest is unstable
> ---
>
> Key: FLINK-19155
> URL: https://issues.apache.org/jira/browse/FLINK-19155
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Aljoscha Krettek
>Assignee: Zhijiang
>Priority: Major
>  Labels: test-stability
>
> I saw a failure in {{testInitializeMoreStateThanBuffer()}}: 
> https://dev.azure.com/aljoschakrettek/Flink/_build/results?buildId=274=logs=6e58d712-c5cc-52fb-0895-6ff7bd56c46b=f30a8e80-b2cf-535c-9952-7f521a4ae374=5995



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15963) Reuse the same ByteBuf while writing the BufferResponse header

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-15963.

Resolution: Later

Close it for cleanup since it is not really critical by now and nobody has time 
to work on it. 
We can reopen it if really necessary in future.

> Reuse the same ByteBuf while writing the BufferResponse header
> --
>
> Key: FLINK-15963
> URL: https://issues.apache.org/jira/browse/FLINK-15963
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Zhijiang
>Priority: Major
>
> On sender side while writing the BufferResponse message, it always allocates 
> the new direct ByteBuf from netty allocator to write header part for every 
> message.
> Maybe we can maintain a fixed ByteBuf per channel and make use of it to write 
> header part for all the BufferResponse messages, then verify how it effects 
> the performance/memory overhead in e2e/benchmark.
> This improvement is also related to another argue that whether it is feasible 
> to suspend writing when the socket is not writable at the moment.  If the 
> socket is not writable temporarily, the server handler would continue writing 
> & flushing BufferResponse messages until reaching the maximum watermark in 
> netty stack. And only the header ByteBuf would occupy the netty watermark at 
> the moment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16739) PrestoS3FileSystemITCase#testSimpleFileWriteAndRead fails with no such key

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-16739.

Resolution: Cannot Reproduce

Close it for cleanup since it was not reproduced in recent half a year.

> PrestoS3FileSystemITCase#testSimpleFileWriteAndRead fails with no such key
> --
>
> Key: FLINK-16739
> URL: https://issues.apache.org/jira/browse/FLINK-16739
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Priority: Major
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> Build: 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6546=logs=e9af9cde-9a65-5281-a58e-2c8511d36983=df5b2bf5-bcff-5dc9-7626-50bed0866a82]
> logs
> {code:java}
> 2020-03-24T01:51:19.6988685Z [INFO] Running 
> org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase
> 2020-03-24T01:51:21.6250893Z [INFO] Running 
> org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase
> 2020-03-24T01:51:25.1626385Z [WARNING] Tests run: 8, Failures: 0, Errors: 0, 
> Skipped: 2, Time elapsed: 5.461 s - in 
> org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase
> 2020-03-24T01:51:50.5503712Z [ERROR] Tests run: 7, Failures: 1, Errors: 1, 
> Skipped: 0, Time elapsed: 28.922 s <<< FAILURE! - in 
> org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase
> 2020-03-24T01:51:50.5506010Z [ERROR] testSimpleFileWriteAndRead[Scheme = 
> s3p](org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase)  Time elapsed: 
> 0.7 s  <<< ERROR!
> 2020-03-24T01:51:50.5513057Z 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
>  com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does 
> not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; 
> Request ID: A07D70A474EABC13; S3 Extended Request ID: 
> R2ReW39oZ9ncoc82xb+V5h/EJV5/Mnsee+7uZ7cFMkliTQ/nKhvHPCDfr5zddbfUdR/S49VdbrA=),
>  S3 Extended Request ID: 
> R2ReW39oZ9ncoc82xb+V5h/EJV5/Mnsee+7uZ7cFMkliTQ/nKhvHPCDfr5zddbfUdR/S49VdbrA= 
> (Path: s3://***/temp/tests-c79a578b-13d9-41ba-b73b-4f53fc965b96/test.txt)
> 2020-03-24T01:51:50.5517642Z Caused by: 
> com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not 
> exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request 
> ID: A07D70A474EABC13; S3 Extended Request ID: 
> R2ReW39oZ9ncoc82xb+V5h/EJV5/Mnsee+7uZ7cFMkliTQ/nKhvHPCDfr5zddbfUdR/S49VdbrA=)
> 2020-03-24T01:51:50.5519791Z 
> 2020-03-24T01:51:50.5520679Z [ERROR] 
> org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase  Time elapsed: 17.431 s 
>  <<< FAILURE!
> 2020-03-24T01:51:50.5521841Z java.lang.AssertionError: expected: but 
> was:
> 2020-03-24T01:51:50.5522437Z 
> 2020-03-24T01:51:50.8966641Z [INFO] 
> 2020-03-24T01:51:50.8967386Z [INFO] Results:
> 2020-03-24T01:51:50.8967849Z [INFO] 
> 2020-03-24T01:51:50.8968357Z [ERROR] Failures: 
> 2020-03-24T01:51:50.8970933Z [ERROR]   
> PrestoS3FileSystemITCase>AbstractHadoopFileSystemITTest.teardown:155->AbstractHadoopFileSystemITTest.checkPathExistence:61
>  expected: but was:
> 2020-03-24T01:51:50.8972311Z [ERROR] Errors: 
> 2020-03-24T01:51:50.8973807Z [ERROR]   
> PrestoS3FileSystemITCase>AbstractHadoopFileSystemITTest.testSimpleFileWriteAndRead:87
>  » UnrecoverableS3Operation
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17741) Create integration or e2e test for out of order (savepoint) barriers

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-17741:
-
Parent: FLINK-19442
Issue Type: Sub-task  (was: Task)

> Create integration or e2e test for out of order (savepoint) barriers
> 
>
> Key: FLINK-17741
> URL: https://issues.apache.org/jira/browse/FLINK-17741
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Roman Khachatryan
>Priority: Major
> Fix For: 1.12.0
>
>
> Unaligned checkpoints MVP assumes at most one barrier at a time. This is 
> ensured by max-concurrent-checkpoints on CheckpointCoordinator.
> But it seems possible that CheckpointCoordinator considers a checkpoint 
> canceled, starts a new one, while some old barriers are still flowing in the 
> graph.
> As of now, this situation isn't tested.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15187) Reuse LocalBufferPool for FileBufferReader in blocking partition

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-15187.

Resolution: Fixed

Resolved in FLINK-15981

> Reuse LocalBufferPool for FileBufferReader in blocking partition
> 
>
> Key: FLINK-15187
> URL: https://issues.apache.org/jira/browse/FLINK-15187
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Network
>Reporter: Zhijiang
>Priority: Minor
>
> If we take the file type via 
> `taskmanager.network.bounded-blocking-subpartition-type` for batch job, while 
> creating the respective view for reading the subpartition persistent data, it 
> would create two unpolled memory segments for every subpartition. This 
> portion of temporary memory is not managed and calculated by framework, so it 
> might cause OOM error concern.
> We can also reuse the ResultPartition's `LocalBufferPool` to read 
> subpartition data to avoid this memory overhead. But there are additional two 
> problems for reuse directly. 
>  * The current core size of `LocalBufferPool` is `numberOfSubpartitions + 1`, 
> but every subpartition needs two segments for pre-reading atm. We can remove 
> the pre-reading to make the current core pool size suitable for the reading 
> requirements, because the pre-reading function seems has no obvious benefits 
> in practice which is only effecting for the last data.
>  * When task finishes, it would destroy the `LocalBufferPool` even though the 
> respective `ResultPartition still alive, so the following subpartition view 
> can not reuse the pool directly. We should adjust the respective logics to 
> either delay destroy the pool or create a new pool for subpartition view.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17741) Create integration or e2e test for out of order (savepoint) barriers

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-17741:
-
Parent: (was: FLINK-14551)
Issue Type: Task  (was: Sub-task)

> Create integration or e2e test for out of order (savepoint) barriers
> 
>
> Key: FLINK-17741
> URL: https://issues.apache.org/jira/browse/FLINK-17741
> Project: Flink
>  Issue Type: Task
>  Components: Tests
>Reporter: Roman Khachatryan
>Priority: Major
> Fix For: 1.12.0
>
>
> Unaligned checkpoints MVP assumes at most one barrier at a time. This is 
> ensured by max-concurrent-checkpoints on CheckpointCoordinator.
> But it seems possible that CheckpointCoordinator considers a checkpoint 
> canceled, starts a new one, while some old barriers are still flowing in the 
> graph.
> As of now, this situation isn't tested.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14551) Unaligned checkpoints

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-14551.

Resolution: Fixed

> Unaligned checkpoints
> -
>
> Key: FLINK-14551
> URL: https://issues.apache.org/jira/browse/FLINK-14551
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / Network
>Reporter: Zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This is the umbrella issue for the feature of unaligned checkpoints. Refer to 
> the 
> [FLIP-76|https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints]
>  for more details.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14551) Unaligned checkpoints

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-14551:
-
Fix Version/s: (was: 1.12.0)
   1.11.0

> Unaligned checkpoints
> -
>
> Key: FLINK-14551
> URL: https://issues.apache.org/jira/browse/FLINK-14551
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / Network
>Reporter: Zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This is the umbrella issue for the feature of unaligned checkpoints. Refer to 
> the 
> [FLIP-76|https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints]
>  for more details.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9253) Make buffer count per InputGate always #channels*buffersPerChannel + ExclusiveBuffers

2020-10-13 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17212929#comment-17212929
 ] 

Zhijiang commented on FLINK-9253:
-

[~NicoK]  I just closed the parent umbrella ticket for cleanup. Do you think we 
need to close this issue since it was not touched already long time and I am 
not quite sure whether it is worth focusing on future.
 

> Make buffer count per InputGate always #channels*buffersPerChannel + 
> ExclusiveBuffers
> -
>
> Key: FLINK-9253
> URL: https://issues.apache.org/jira/browse/FLINK-9253
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The credit-based flow control path assigns exclusive buffers only to remote 
> channels (which makes sense since local channels don't use any own buffers). 
> However, this is a bit intransparent with respect to how much data may be in 
> buffers since this depends on the actual schedule of the job and not the job 
> graph.
> By adapting the floating buffers to use a maximum of 
> {{#channels*buffersPerChannel + floatingBuffersPerGate - #exclusiveBuffers}}, 
> we would be channel-type agnostic and keep the old behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-7282) Credit-based Network Flow Control

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-7282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-7282.
---
Resolution: Fixed

> Credit-based Network Flow Control
> -
>
> Key: FLINK-7282
> URL: https://issues.apache.org/jira/browse/FLINK-7282
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Network
>Reporter: Zhijiang
>Assignee: Zhijiang
>Priority: Major
>
> This is a part of work for network stack improvements proposed in 
> [~StephanEwen]  's 
> [FLIP|https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#]
> Backpressure currently happens very naturally through the TCP network 
> connections and the bounded buffering capacity. The downsides are :
> * All channels multiplexed into the same TCP connection stall together, as 
> soon as one channel has backpressure.
> * Under backpressure, connections can not transport checkpoint barriers.
> This flink-managed flow control is similar to the window-based advertisement 
> mechanism in TCP. The basic approaches are the following:
> * Each RemoteInputChannel has fixed exclusive buffers as initial credits, and 
> SingleInputGate has a fixed buffer pool for managing floating buffers for all 
> RemoteInputChannels.
> * RemoteInputChannel as receiver notifies the current available credits to 
> the sender side.
> * Senders must never send buffers without credit, that means all the buffers 
> sent must be accepted by receivers so no buffers accumulated on the network 
> wire.  
> * Senders also send the current size of backlog that indicates how many 
> buffers are available on the sender side. The receivers use this information 
> to decide how to request floating buffers from the fixed buffer pool.
> To avoid immediate commits affecting master branch, it will be implemented 
> into a separate feature branch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-13478.

Resolution: Won't Fix

This issue is out of date since partition release logics have changed a lot 
before. 
Close it for cleanup and we can review it again if really necessary future.

> Decouple two different release strategies in BoundedBlockingSubpartition
> 
>
> Key: FLINK-13478
> URL: https://issues.apache.org/jira/browse/FLINK-13478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Zhijiang
>Assignee: Zhijiang
>Priority: Minor
>
> We have two basic release strategies atm. One is based on consumption via 
> network notification from consumer. The other is based on notification via 
> RPC from JM/scheduler.
> But in current implementation of {{BoundedBlockingSubpartition}}, these two 
> ways are a bit coupled with each other. If the JM decides to release 
> partition and send the release RPC call, it has to wait all the reader views 
> really released before finally closing the data file. So the JM-RPC-based 
> release strategy still relies on the consumption confirmation via network to 
> some extent.
> In order to make these two release strategies independent, if the release 
> call is from JM/scheduler RPC, we could immediately release all the view 
> readers and then close the data file as well. If the release is based on 
> consumption confirmation, after all the view readers for one subpartition are 
> released, the subpartition could further notify the parent 
> {{ResultPartition}} which decides whether to release the whole partition or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-13493.

Resolution: Later

Close this issue for cleanup since it is not really important ATM and we can 
reopen it again once needed future.

> BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the 
> readers are empty
> ---
>
> Key: FLINK-13493
> URL: https://issues.apache.org/jira/browse/FLINK-13493
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Zhijiang
>Assignee: Zhijiang
>Priority: Minor
>
> In previous implementation, it would always notify the {{ResultPartition}} of 
> consumed subpartition if any reader view is released. Based on the 
> reference-counter release strategy it might cause problems if one blocking 
> subpartition has multiple readers. That means the whole result partition 
> might be released but there are still alive readers in some subpartitions.
> Although the default release strategy for blocking partition is based on 
> JM/scheduler notification atm. But if we switch the option to based on 
> consumption notification it would cause problems. And from the subpartition 
> side it should has the right behavior no matter what is the specific release 
> strategy in upper layer.
> In order to fix this bug, the {{BoundedBlockingSubpartition}} would only 
> notify {{onConsumedSubpartition}} when all the readers are empty.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16428) Fine-grained network buffer management for backpressure

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-16428:


Assignee: (was: Zhijiang)

> Fine-grained network buffer management for backpressure
> ---
>
> Key: FLINK-16428
> URL: https://issues.apache.org/jira/browse/FLINK-16428
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Zhijiang
>Priority: Critical
> Fix For: 1.12.0
>
>
> It is an umbrella ticket for tracing the progress of this improvement.
> This is the second ingredient to solve the “checkpoints under backpressure” 
> problem (together with unaligned checkpoints). It consists of two steps:
>  * See if we can use less network memory in general for streaming jobs (with 
> potentially different distribution of floating buffers in the input side)
>  * Under backpressure, reduce network memory to have less in-flight data 
> (needs specification of algorithm and experiments)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16753) Exception from AsyncCheckpointRunnable should be wrapped in CheckpointException

2020-10-13 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-16753.

Resolution: Fixed

Merged in master: a1b1dc95210e99e6c23069415283fce4ec3793a5

> Exception from AsyncCheckpointRunnable should be wrapped in 
> CheckpointException
> ---
>
> Key: FLINK-16753
> URL: https://issues.apache.org/jira/browse/FLINK-16753
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0, 1.11.0, 1.11.1
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> If an exception is thrown from task's async checkpoint process, the 
> checkpoint will be declined as expected, but the reason for declining 
> checkpoint will be regarded as {{CheckpointFailureReason.JOB_FAILURE}}, which 
> gives a wrong message to users.
> I think we can simply replace
> {code:java}
> owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(),
>  checkpointException);
> {code}
> with
>  
> {code:java}
> owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(),
>  new CheckpointException(CheckpointFailureReason.EXCEPTION, 
> checkpointException));
> {code}
> in {{AsyncCheckpointRunnable.handleExecutionException}}.
> cc [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-10653) Introduce Pluggable Shuffle Service Architecture

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-10653:
-
Fix Version/s: 1.9.0

> Introduce Pluggable Shuffle Service Architecture
> 
>
> Key: FLINK-10653
> URL: https://issues.apache.org/jira/browse/FLINK-10653
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Network
>Reporter: Zhijiang
>Assignee: Zhijiang
>Priority: Major
> Fix For: 1.9.0
>
>
> This is the umbrella issue for improving shuffle architecture.
> Shuffle is the process of data transfer between stages, which involves in 
> writing outputs on sender side and reading data on receiver side. In flink 
> implementation, it covers three parts of writer, transport layer and reader 
> separately which are uniformed for both streaming and batch jobs.
> In detail, the current ResultPartitionWriter interface on upstream side only 
> supports in-memory outputs for streaming job and local persistent file 
> outputs for batch job. If we extend to implement another writer such as 
> DfsWriter, RdmaWriter, SortMergeWriter, etc based on ResultPartitionWriter 
> interface, it has not the unified mechanism to extend the reader side 
> accordingly. 
> In order to make the shuffle architecture more flexible and support more 
> scenarios especially for batch jobs, a high level shuffle architecture is 
> necessary to manage and extend both writer and reader sides together.
> Refer to the design doc for more details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-10653) Introduce Pluggable Shuffle Service Architecture

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-10653.

Resolution: Fixed

[~trohrmann], thanks for this kind reminder,:). 
I already migrated all the pending open issues to a new follow up umbrella 
ticket FLINK-19551 for future tracing.

> Introduce Pluggable Shuffle Service Architecture
> 
>
> Key: FLINK-10653
> URL: https://issues.apache.org/jira/browse/FLINK-10653
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Network
>Reporter: Zhijiang
>Assignee: Zhijiang
>Priority: Major
>
> This is the umbrella issue for improving shuffle architecture.
> Shuffle is the process of data transfer between stages, which involves in 
> writing outputs on sender side and reading data on receiver side. In flink 
> implementation, it covers three parts of writer, transport layer and reader 
> separately which are uniformed for both streaming and batch jobs.
> In detail, the current ResultPartitionWriter interface on upstream side only 
> supports in-memory outputs for streaming job and local persistent file 
> outputs for batch job. If we extend to implement another writer such as 
> DfsWriter, RdmaWriter, SortMergeWriter, etc based on ResultPartitionWriter 
> interface, it has not the unified mechanism to extend the reader side 
> accordingly. 
> In order to make the shuffle architecture more flexible and support more 
> scenarios especially for batch jobs, a high level shuffle architecture is 
> necessary to manage and extend both writer and reader sides together.
> Refer to the design doc for more details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12873) Create a separate maven module for Shuffle API

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-12873:
-
Parent: (was: FLINK-10653)
Issue Type: Task  (was: Sub-task)

> Create a separate maven module for Shuffle API
> --
>
> Key: FLINK-12873
> URL: https://issues.apache.org/jira/browse/FLINK-12873
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Configuration, Runtime / Network
>Reporter: Andrey Zagrebin
>Priority: Major
>
> At the moment, shuffle service API is a part of flink-runtime maven module. 
> The implementers of other shuffle services will have to depend on the fat 
> dependency of flink-runtime. We should consider factoring out the shuffle API 
> interfaces into a separate maven module which depends only on flink-core. 
> Later we can consider the same for the custom high availability services.
> The final structure could be e.g. (up to discussion):
>  * flink-runtime (already includes default shuffle and high availability 
> implementations)
>  * flink-runtime-extensions
>  ** flink-runtime-extensions-core
>  ** flink-shuffle-extensions-api
>  ** flink-high-availability-extensions-api



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12873) Create a separate maven module for Shuffle API

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-12873:
-
Parent: FLINK-19551
Issue Type: Sub-task  (was: Task)

> Create a separate maven module for Shuffle API
> --
>
> Key: FLINK-12873
> URL: https://issues.apache.org/jira/browse/FLINK-12873
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration, Runtime / Network
>Reporter: Andrey Zagrebin
>Priority: Major
>
> At the moment, shuffle service API is a part of flink-runtime maven module. 
> The implementers of other shuffle services will have to depend on the fat 
> dependency of flink-runtime. We should consider factoring out the shuffle API 
> interfaces into a separate maven module which depends only on flink-core. 
> Later we can consider the same for the custom high availability services.
> The final structure could be e.g. (up to discussion):
>  * flink-runtime (already includes default shuffle and high availability 
> implementations)
>  * flink-runtime-extensions
>  ** flink-runtime-extensions-core
>  ** flink-shuffle-extensions-api
>  ** flink-high-availability-extensions-api



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12731) Load shuffle service implementations from plugin manager

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-12731:
-
Parent: (was: FLINK-10653)
Issue Type: Task  (was: Sub-task)

> Load shuffle service implementations from plugin manager
> 
>
> Key: FLINK-12731
> URL: https://issues.apache.org/jira/browse/FLINK-12731
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Coordination
>Reporter: Andrey Zagrebin
>Priority: Major
>
> The simple way to load shuffle service is to do it from class path with the 
> default class loader. Additionally, shuffle service implementations can be 
> loaded as plugins with their own class loaders using PluginManager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12731) Load shuffle service implementations from plugin manager

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-12731:
-
Parent: FLINK-19551
Issue Type: Sub-task  (was: Task)

> Load shuffle service implementations from plugin manager
> 
>
> Key: FLINK-12731
> URL: https://issues.apache.org/jira/browse/FLINK-12731
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Andrey Zagrebin
>Priority: Major
>
> The simple way to load shuffle service is to do it from class path with the 
> default class loader. Additionally, shuffle service implementations can be 
> loaded as plugins with their own class loaders using PluginManager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12543) Consider naming convention for config options of shuffle services

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-12543:
-
Parent: FLINK-19551
Issue Type: Sub-task  (was: Task)

> Consider naming convention for config options of shuffle services
> -
>
> Key: FLINK-12543
> URL: https://issues.apache.org/jira/browse/FLINK-12543
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Andrey Zagrebin
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12543) Consider naming convention for config options of shuffle services

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-12543:
-
Parent: (was: FLINK-10653)
Issue Type: Task  (was: Sub-task)

> Consider naming convention for config options of shuffle services
> -
>
> Key: FLINK-12543
> URL: https://issues.apache.org/jira/browse/FLINK-12543
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Andrey Zagrebin
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13642) Refactor ShuffleMaster to optionally provide preferred TM location for produced partitions

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-13642:
-
Parent: FLINK-19551
Issue Type: Sub-task  (was: Task)

> Refactor ShuffleMaster to optionally provide preferred TM location for 
> produced partitions
> --
>
> Key: FLINK-13642
> URL: https://issues.apache.org/jira/browse/FLINK-13642
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Andrey Zagrebin
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12110) Introduce API point for ShuffleEnvironment to get estimation of locally required memory

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-12110:
-
Parent: (was: FLINK-10653)
Issue Type: Task  (was: Sub-task)

> Introduce API point for ShuffleEnvironment to get estimation of locally 
> required memory
> ---
>
> Key: FLINK-12110
> URL: https://issues.apache.org/jira/browse/FLINK-12110
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Network
>Reporter: Andrey Zagrebin
>Priority: Major
>
> At the moment, we use static method TaskManagerServices.calculateHeapSizeMB 
> to estimate local memory needed in TM container for network environment. As 
> network environment becomes pluggable shuffle service, we should also make 
> this request from container to shuffle service pluggable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12110) Introduce API point for ShuffleEnvironment to get estimation of locally required memory

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-12110:
-
Parent: FLINK-19551
Issue Type: Sub-task  (was: Task)

> Introduce API point for ShuffleEnvironment to get estimation of locally 
> required memory
> ---
>
> Key: FLINK-12110
> URL: https://issues.apache.org/jira/browse/FLINK-12110
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Andrey Zagrebin
>Priority: Major
>
> At the moment, we use static method TaskManagerServices.calculateHeapSizeMB 
> to estimate local memory needed in TM container for network environment. As 
> network environment becomes pluggable shuffle service, we should also make 
> this request from container to shuffle service pluggable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13642) Refactor ShuffleMaster to optionally provide preferred TM location for produced partitions

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-13642:
-
Parent: (was: FLINK-10653)
Issue Type: Task  (was: Sub-task)

> Refactor ShuffleMaster to optionally provide preferred TM location for 
> produced partitions
> --
>
> Key: FLINK-13642
> URL: https://issues.apache.org/jira/browse/FLINK-13642
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Coordination
>Reporter: Andrey Zagrebin
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19551) Follow up improvements for shuffle service

2020-10-09 Thread Zhijiang (Jira)
Zhijiang created FLINK-19551:


 Summary: Follow up improvements for shuffle service 
 Key: FLINK-19551
 URL: https://issues.apache.org/jira/browse/FLINK-19551
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Network
Reporter: Zhijiang


After resolving the core architecture and functions of pluggable shuffle 
service proposed by FLINK-10653, there are still some pending followup issues 
to be traced future in this umbrella ticket with low priority.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-12110) Introduce API point for ShuffleEnvironment to get estimation of locally required memory

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-12110:


Assignee: (was: Zhijiang)

> Introduce API point for ShuffleEnvironment to get estimation of locally 
> required memory
> ---
>
> Key: FLINK-12110
> URL: https://issues.apache.org/jira/browse/FLINK-12110
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Andrey Zagrebin
>Priority: Major
>
> At the moment, we use static method TaskManagerServices.calculateHeapSizeMB 
> to estimate local memory needed in TM container for network environment. As 
> network environment becomes pluggable shuffle service, we should also make 
> this request from container to shuffle service pluggable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-13010) Refactor the process of SchedulerNG#requestPartitionState

2020-10-09 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-13010.

Resolution: Later

Close it for cleanup, then consider reopening it future once necessary and 
available for this refactoring work.

> Refactor the process of SchedulerNG#requestPartitionState
> -
>
> Key: FLINK-13010
> URL: https://issues.apache.org/jira/browse/FLINK-13010
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Zhijiang
>Assignee: Zhijiang
>Priority: Minor
>
> Currently `requestPartitionState` is mainly used for querying partition state 
> when the consumer receives `PartitionNotFoundException` during requesting 
> partition. Actually we do not have the concept of partition state atm, and ` 
> requestPartitionState` would return the corresponding producer's state as as 
> result, so it exists a contradiction here.
> My suggestion is refactoring the method as `requestPartitionProducerState` 
> and we do not need to pass `IntermediateDataSetID` and `ResultPartitionID` 
> arguments for finding the corresponding execution attempt. We could only pass 
> the `ExecutionAttemptID` in method then the corresponding execution attempt 
> could be easily found from the mapping in `ExecutionGraph`.
> To do so, we could further remove ` IntermediateDataSetID` from 
> `SingleInputGate` and might replace `IntermediateDataSetID` by `InputGateID` 
> in `InputGateDeploymentDescriptor`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19249) Job would wait sometime(~10 min) before failover if some connection broken

2020-09-29 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17204453#comment-17204453
 ] 

Zhijiang commented on FLINK-19249:
--

Thanks for reporting this [~klion26] and all the above discussions.

I am curious of why the downstream side is aware of this problem after ten 
minutes in network stack.
As we known, when the netty server(upstream) detects the network physical 
problem, it will do below two things:

* Send the ErrorResponse message to netty client(downstream);
* Close the channel explicitly on its side after sending above message.

So the downstream side actually relies on two mechanisms for failure detection 
and handling:
* Logic ErrorResponse message from upstream side, if the downstream can receive 
it from network, then it will fail itself.
* Physical kernel mechanism: while upstream closing the local channel, the 
downstream side will also detect this inactive channel after some time(TCP 
mechanism), and then fail itself via operating `#handler#channelInactive` for 
example.

If the above two mechanisms are not alway reliable in some bad network 
environment, or delay because of kernel default setting, then we might provide 
another application mechanism to resolve it for safety. I can think of a 
previously discussed option to let upstream report this network exception to 
JobManager side in RPC, then the manager can decide to cancel/fail the related 
tasks.

Regarding the other options as `ReadTimeOutHandle/IdelStateHandle`, I am 
wondering they might bring other side effects and also not always reliable or 
limited by network stack.

> Job would wait sometime(~10 min) before failover if some connection broken
> --
>
> Key: FLINK-19249
> URL: https://issues.apache.org/jira/browse/FLINK-19249
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Congxian Qiu(klion26)
>Priority: Blocker
> Fix For: 1.12.0, 1.11.3
>
>
> {quote}encountered this error on 1.7, after going through the master code, I 
> think the problem is still there
> {quote}
> When the network environment is not so good, the connection between the 
> server and the client may be disconnected innocently. After the 
> disconnection, the server will receive the IOException such as below
> {code:java}
> java.io.IOException: Connection timed out
>  at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>  at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>  at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>  at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>  at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>  at java.lang.Thread.run(Thread.java:748)
> {code}
> then release the view reader.
> But the job would not fail until the downstream detect the disconnection 
> because of {{channelInactive}} later(~10 min). between such time, the job can 
> still process data, but the broken channel can't transfer any data or event, 
> so snapshot would fail during this time. this will cause the job to replay 
> many data after failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17331) Add NettyMessageContent interface for all the class which could be write to NettyMessage

2020-09-29 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17204445#comment-17204445
 ] 

Zhijiang commented on FLINK-17331:
--

After offline discussing with [~karmagyz], i am aware of this motivation now.

While writing header for some NettyMessage instances into network stack, e.g. 
BufferResponse, the internal involved field length is hard coded inside 
NettyMessage. So if we make some changes for such field e.g. InputChannelID, 
then we are not ware that it also needs to change the respective length for 
related netty messages component. It would be better if we can make some 
dependence among them or find the inconsistency after changes early.

This improvement is not so critical and urgent, so feel free when you want to 
improve it a bit. [~karmagyz]

> Add NettyMessageContent interface for all the class which could be write to 
> NettyMessage
> 
>
> Key: FLINK-17331
> URL: https://issues.apache.org/jira/browse/FLINK-17331
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Minor
>
> Currently, there are some classes, e.g. {{JobVertexID}}, 
> {{ExecutionAttemptID}} need to write to {{NettyMessage}}. However, the size 
> of these classes in {{ByteBuf}} are directly written in {{NettyMessage}} 
> class, which is error-prone. If someone edits those classes, there would be 
> no warning or error during the compile phase. I think it would be better to 
> add a {{NettyMessageContent}}(the name could be discussed) interface:
> {code:java}
> public interface NettyMessageContent {
> void writeTo(ByteBuf bug)
> int getContentLen();
> }
> {code}
> Regarding the {{fromByteBuf}}, since it is a static method, we could not add 
> it to the interface. We might explain it in the javaDoc of 
> {{NettyMessageContent}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-17331) Add NettyMessageContent interface for all the class which could be write to NettyMessage

2020-09-29 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-17331:


Assignee: Yangze Guo

> Add NettyMessageContent interface for all the class which could be write to 
> NettyMessage
> 
>
> Key: FLINK-17331
> URL: https://issues.apache.org/jira/browse/FLINK-17331
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Minor
>
> Currently, there are some classes, e.g. {{JobVertexID}}, 
> {{ExecutionAttemptID}} need to write to {{NettyMessage}}. However, the size 
> of these classes in {{ByteBuf}} are directly written in {{NettyMessage}} 
> class, which is error-prone. If someone edits those classes, there would be 
> no warning or error during the compile phase. I think it would be better to 
> add a {{NettyMessageContent}}(the name could be discussed) interface:
> {code:java}
> public interface NettyMessageContent {
> void writeTo(ByteBuf bug)
> int getContentLen();
> }
> {code}
> Regarding the {{fromByteBuf}}, since it is a static method, we could not add 
> it to the interface. We might explain it in the javaDoc of 
> {{NettyMessageContent}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17331) Add NettyMessageContent interface for all the class which could be write to NettyMessage

2020-09-29 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-17331:
-
Fix Version/s: (was: 1.12.0)

> Add NettyMessageContent interface for all the class which could be write to 
> NettyMessage
> 
>
> Key: FLINK-17331
> URL: https://issues.apache.org/jira/browse/FLINK-17331
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yangze Guo
>Priority: Minor
>
> Currently, there are some classes, e.g. {{JobVertexID}}, 
> {{ExecutionAttemptID}} need to write to {{NettyMessage}}. However, the size 
> of these classes in {{ByteBuf}} are directly written in {{NettyMessage}} 
> class, which is error-prone. If someone edits those classes, there would be 
> no warning or error during the compile phase. I think it would be better to 
> add a {{NettyMessageContent}}(the name could be discussed) interface:
> {code:java}
> public interface NettyMessageContent {
> void writeTo(ByteBuf bug)
> int getContentLen();
> }
> {code}
> Regarding the {{fromByteBuf}}, since it is a static method, we could not add 
> it to the interface. We might explain it in the javaDoc of 
> {{NettyMessageContent}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19400) Removed unused BufferPoolOwner

2020-09-27 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17202759#comment-17202759
 ] 

Zhijiang commented on FLINK-19400:
--

+1 to remove, it is indeed no actual usages in core codes, as 
`ResultPartition#releaseMemory` is empty and we have to pass `nullable` owner 
in related constructors and maintain the related logics.

> Removed unused BufferPoolOwner
> --
>
> Key: FLINK-19400
> URL: https://issues.apache.org/jira/browse/FLINK-19400
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.12.0
>Reporter: Arvid Heise
>Assignee: Arvid Heise
>Priority: Major
>
> {{BufferPoolOwner}} does not have any production usages and just complicates 
> a few tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-19297) Make ResultPartitionWriter record-oriented

2020-09-24 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-19297:


Assignee: Yingjie Cao

> Make ResultPartitionWriter record-oriented
> --
>
> Key: FLINK-19297
> URL: https://issues.apache.org/jira/browse/FLINK-19297
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.12.0
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Currently, ResultPartitionWriter is buffer-oriented, that is, RecordWriter 
> will add buffer of different channels to ResultPartitionWriter and the buffer 
> boundary serves as a nature boundary of data belonging to different channels. 
> However, this abstraction is not flexible enough to handle some cases where 
> records are appended a joint-structure shared by all channels and sorting is 
> used to cluster data belonging to different channels.
> In this ticket, we propose to make ResultPartitionWriter record oriented 
> which offers more flexibility to the implementation of ResultPartitionWriter. 
> And based on the new record-oriented Interface, we will introduce the 
> sort-merge based blocking shuffle to Flink in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-19299) NettyShuffleEnvironmentBuilder#setBufferSize does not take effect

2020-09-21 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-19299:


Assignee: Yingjie Cao

> NettyShuffleEnvironmentBuilder#setBufferSize does not take effect
> -
>
> Key: FLINK-19299
> URL: https://issues.apache.org/jira/browse/FLINK-19299
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Currently, NettyShuffleEnvironmentBuilder#setBufferSize does not take effect 
> because the set value is never used when building the NettyShuffleEnvironment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-09-08 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191748#comment-17191748
 ] 

Zhijiang edited comment on FLINK-18832 at 9/8/20, 8:10 AM:
---

Merged in master: 13e0b355d1d9ba513671de1638d3c35edb6b96a3
Merged in release-1.11: 11edb3dd47999bcad94c93a204cd20fdd5360b41
Merged in release-1.10: 85c181745855db4396f28c53eb6b478040c902cb


was (Author: zjwang):
Merged in master: 13e0b355d1d9ba513671de1638d3c35edb6b96a3

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-19155) ResultPartitionTest is unstable

2020-09-07 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-19155:


Assignee: Zhijiang

> ResultPartitionTest is unstable
> ---
>
> Key: FLINK-19155
> URL: https://issues.apache.org/jira/browse/FLINK-19155
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Aljoscha Krettek
>Assignee: Zhijiang
>Priority: Major
>  Labels: test-stability
>
> I saw a failure in {{testInitializeMoreStateThanBuffer()}}: 
> https://dev.azure.com/aljoschakrettek/Flink/_build/results?buildId=274=logs=6e58d712-c5cc-52fb-0895-6ff7bd56c46b=f30a8e80-b2cf-535c-9952-7f521a4ae374=5995



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-09-07 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-18832.

Resolution: Fixed

Merged in master: 13e0b355d1d9ba513671de1638d3c35edb6b96a3

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19109) Split Reader eats chained periodic watermarks

2020-09-07 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-19109:
-
Fix Version/s: (was: 1.10.3)

> Split Reader eats chained periodic watermarks
> -
>
> Key: FLINK-19109
> URL: https://issues.apache.org/jira/browse/FLINK-19109
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1
>Reporter: David Anderson
>Assignee: Roman Khachatryan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
> Attempting to generate watermarks chained to the Split Reader / 
> ContinuousFileReaderOperator, as in
> {code:java}
> SingleOutputStreamOperator results = env
>   .readTextFile(...)
>   .map(...)
>   .assignTimestampsAndWatermarks(bounded)
>   .keyBy(...)
>   .process(...);{code}
> leads to the Watermarks failing to be produced. Breaking the chain, via 
> {{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using 
> punctuated watermarks also avoids the issue.
> Looking at this in the debugger reveals that timer service is being 
> prematurely quiesced.
> In many respects this is FLINK-7666 brought back to life.
> The problem is not present in 1.9.3.
> There's a minimal reproducible example in 
> [https://github.com/alpinegizmo/flink-question-001/tree/bug].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-19109) Split Reader eats chained periodic watermarks

2020-09-07 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-19109.

Resolution: Fixed

Merged in master: 361ccb3ad734490aa4df9aa9d0430a31680581db
Merged in release-1.11: bae7d4b7d821cc9cca3d17e1f432be22cd7dbf76

> Split Reader eats chained periodic watermarks
> -
>
> Key: FLINK-19109
> URL: https://issues.apache.org/jira/browse/FLINK-19109
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1
>Reporter: David Anderson
>Assignee: Roman Khachatryan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> Attempting to generate watermarks chained to the Split Reader / 
> ContinuousFileReaderOperator, as in
> {code:java}
> SingleOutputStreamOperator results = env
>   .readTextFile(...)
>   .map(...)
>   .assignTimestampsAndWatermarks(bounded)
>   .keyBy(...)
>   .process(...);{code}
> leads to the Watermarks failing to be produced. Breaking the chain, via 
> {{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using 
> punctuated watermarks also avoids the issue.
> Looking at this in the debugger reveals that timer service is being 
> prematurely quiesced.
> In many respects this is FLINK-7666 brought back to life.
> The problem is not present in 1.9.3.
> There's a minimal reproducible example in 
> [https://github.com/alpinegizmo/flink-question-001/tree/bug].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19109) Split Reader eats chained periodic watermarks

2020-09-07 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17191577#comment-17191577
 ] 

Zhijiang commented on FLINK-19109:
--

[~zhuzh][~roman_khachatryan], I will merge it in 1.11 after azure pass, I think 
it can be done later today.

> Split Reader eats chained periodic watermarks
> -
>
> Key: FLINK-19109
> URL: https://issues.apache.org/jira/browse/FLINK-19109
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1
>Reporter: David Anderson
>Assignee: Roman Khachatryan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> Attempting to generate watermarks chained to the Split Reader / 
> ContinuousFileReaderOperator, as in
> {code:java}
> SingleOutputStreamOperator results = env
>   .readTextFile(...)
>   .map(...)
>   .assignTimestampsAndWatermarks(bounded)
>   .keyBy(...)
>   .process(...);{code}
> leads to the Watermarks failing to be produced. Breaking the chain, via 
> {{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using 
> punctuated watermarks also avoids the issue.
> Looking at this in the debugger reveals that timer service is being 
> prematurely quiesced.
> In many respects this is FLINK-7666 brought back to life.
> The problem is not present in 1.9.3.
> There's a minimal reproducible example in 
> [https://github.com/alpinegizmo/flink-question-001/tree/bug].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-19093) "Elasticsearch (v6.3.1) sink end-to-end test" failed with "SubtaskCheckpointCoordinatorImpl was closed without closing asyncCheckpointRunnable 1"

2020-09-02 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-19093.

Resolution: Fixed

Merged in master: 76f37551ad51f7eae9688495130a39b0f202c3bf

> "Elasticsearch (v6.3.1) sink end-to-end test" failed with 
> "SubtaskCheckpointCoordinatorImpl was closed without closing 
> asyncCheckpointRunnable 1"
> -
>
> Key: FLINK-19093
> URL: https://issues.apache.org/jira/browse/FLINK-19093
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=5986=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6
> {code}
> 2020-08-29T22:20:02.3500263Z 2020-08-29 22:20:00,851 INFO  
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: 
> Sequence Source -> Flat Map -> Sink: Unnamed (1/1) - asynchronous part of 
> checkpoint 1 could not be completed.
> 2020-08-29T22:20:02.3501112Z java.lang.IllegalStateException: 
> SubtaskCheckpointCoordinatorImpl was closed without closing 
> asyncCheckpointRunnable 1
> 2020-08-29T22:20:02.3502049Z  at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:217) 
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> 2020-08-29T22:20:02.3503280Z  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.registerAsyncCheckpointRunnable(SubtaskCheckpointCoordinatorImpl.java:371)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> 2020-08-29T22:20:02.3504647Z  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.lambda$registerConsumer$2(SubtaskCheckpointCoordinatorImpl.java:479)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> 2020-08-29T22:20:02.3505882Z  at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:95)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> 2020-08-29T22:20:02.3506614Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_265]
> 2020-08-29T22:20:02.3507203Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_265]
> 2020-08-29T22:20:02.3507685Z  at java.lang.Thread.run(Thread.java:748) 
> [?:1.8.0_265]
> 2020-08-29T22:20:02.3509577Z 2020-08-29 22:20:00,927 INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
> TaskSlot(index:0, state:ACTIVE, resource profile: 
> ResourceProfile{cpuCores=1., taskHeapMemory=384.000mb 
> (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb 
> (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 
> ca890bc4df19c66146370647d07bf510, jobId: 3522a3e4940d4b3cefc6dc1f22123f4b).
> 2020-08-29T22:20:02.3511425Z 2020-08-29 22:20:00,939 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
> 3522a3e4940d4b3cefc6dc1f22123f4b from job leader monitoring.
> 2020-08-29T22:20:02.3512499Z 2020-08-29 22:20:00,939 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close 
> JobManager connection for job 3522a3e4940d4b3cefc6dc1f22123f4b.
> 2020-08-29T22:20:02.3513174Z Checking for non-empty .out files...
> 2020-08-29T22:20:02.3513706Z No non-empty .out files.
> 2020-08-29T22:20:02.3513878Z 
> 2020-08-29T22:20:02.3514679Z [FAIL] 'Elasticsearch (v6.3.1) sink end-to-end 
> test' failed after 0 minutes and 37 seconds! Test exited with exit code 0 but 
> the logs contained errors, exceptions or non-empty .out files
> 2020-08-29T22:20:02.3515138Z 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15981) Control the direct memory in FileChannelBoundedData.FileBufferReader

2020-09-01 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-15981:
-
Affects Version/s: 1.10.1
   1.11.0
   1.10.2
   1.11.1

> Control the direct memory in FileChannelBoundedData.FileBufferReader
> 
>
> Key: FLINK-15981
> URL: https://issues.apache.org/jira/browse/FLINK-15981
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1
>Reporter: Jingsong Lee
>Priority: Critical
>
> Now, the default blocking BoundedData is FileChannelBoundedData. In its 
> reader, will create new direct buffer 64KB.
> When parallelism greater than 100, users need configure 
> "taskmanager.memory.task.off-heap.size" to avoid direct memory OOM. It is 
> hard to configure, and it cost a lot of memory. Consider 1000 parallelism, 
> maybe we need 1GB+ for a task manager.
> This is not conducive to the scenario of less slots and large parallelism. 
> Batch jobs could run little by little, but memory shortage would consume a 
> lot.
> If we provided N-Input operators, maybe things will be worse. This means the 
> number of subpartitions that can be requested at the same time will be more. 
> We have no idea how much memory.
> Here are my rough thoughts:
>  * Obtain memory from network buffers.
>  * provide "The maximum number of subpartitions that can be requested at the 
> same time".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19054) KafkaTableITCase.testKafkaSourceSink hangs

2020-08-31 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188111#comment-17188111
 ] 

Zhijiang commented on FLINK-19054:
--

Anther instance 
https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/6003/logs/111

> KafkaTableITCase.testKafkaSourceSink hangs
> --
>
> Key: FLINK-19054
> URL: https://issues.apache.org/jira/browse/FLINK-19054
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=5844=logs=c5f0071e-1851-543e-9a45-9ac140befc32=684b1416-4c17-504e-d5ab-97ee44e08a20]
> {code}
> 2020-08-25T09:04:57.3569768Z "Kafka Fetcher for Source: 
> KafkaTableSource(price, currency, log_date, log_time, log_ts) -> 
> SourceConversion(table=[default_catalog.default_database.kafka, source: 
> [KafkaTableSource(price, currency, log_date, log_time, log_ts)]], 
> fields=[price, currency, log_date, log_time, log_ts]) -> Calc(select=[(price 
> + 1.0:DECIMAL(2, 1)) AS computed-price, price, currency, log_date, log_time, 
> log_ts, (log_ts + 1000:INTERVAL SECOND) AS ts]) -> 
> WatermarkAssigner(rowtime=[ts], watermark=[ts]) -> Calc(select=[ts, log_date, 
> log_time, CAST(ts) AS ts0, price]) (1/1)" #1501 daemon prio=5 os_prio=0 
> tid=0x7f25b800 nid=0x22b8 runnable [0x7f2127efd000]
> 2020-08-25T09:04:57.3571373Zjava.lang.Thread.State: RUNNABLE
> 2020-08-25T09:04:57.3571672Z  at sun.nio.ch.FileDispatcherImpl.read0(Native 
> Method)
> 2020-08-25T09:04:57.3572191Z  at 
> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> 2020-08-25T09:04:57.3572921Z  at 
> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> 2020-08-25T09:04:57.3573419Z  at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> 2020-08-25T09:04:57.3573957Z  at 
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377)
> 2020-08-25T09:04:57.3574809Z  - locked <0xfde5a308> (a 
> java.lang.Object)
> 2020-08-25T09:04:57.3575448Z  at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> 2020-08-25T09:04:57.3576309Z  at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> 2020-08-25T09:04:57.3577086Z  at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> 2020-08-25T09:04:57.3577727Z  at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> 2020-08-25T09:04:57.3578403Z  at 
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> 2020-08-25T09:04:57.3579486Z  at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> 2020-08-25T09:04:57.3580240Z  at 
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> 2020-08-25T09:04:57.3580880Z  at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> 2020-08-25T09:04:57.3581756Z  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> 2020-08-25T09:04:57.3583015Z  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> 2020-08-25T09:04:57.3583847Z  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> 2020-08-25T09:04:57.3584555Z  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> 2020-08-25T09:04:57.3585197Z  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
> 2020-08-25T09:04:57.3585961Z  at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:253)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18694) Add unaligned checkpoint config to web ui

2020-08-30 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-18694.

Resolution: Fixed

Merged in master: 56ee6b49c594ac5da12a8d5d666d61bccf9e4083

> Add unaligned checkpoint config to web ui
> -
>
> Key: FLINK-18694
> URL: https://issues.apache.org/jira/browse/FLINK-18694
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Kboh
>Assignee: Kboh
>Priority: Trivial
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> h2. What is the purpose of the change
>  * Show in web ui if unaligned checkpoints are enabled.
> h2. Brief change log
>  * Adds unaligned checkpoint config to REST endpoint, and web ui.
>  
> [https://github.com/apache/flink/pull/12962]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18694) Add unaligned checkpoint config to web ui

2020-08-30 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-18694:
-
Fix Version/s: 1.12.0

> Add unaligned checkpoint config to web ui
> -
>
> Key: FLINK-18694
> URL: https://issues.apache.org/jira/browse/FLINK-18694
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Kboh
>Assignee: Kboh
>Priority: Trivial
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> h2. What is the purpose of the change
>  * Show in web ui if unaligned checkpoints are enabled.
> h2. Brief change log
>  * Adds unaligned checkpoint config to REST endpoint, and web ui.
>  
> [https://github.com/apache/flink/pull/12962]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18694) Add unaligned checkpoint config to web ui

2020-08-30 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-18694:


Assignee: Kboh

> Add unaligned checkpoint config to web ui
> -
>
> Key: FLINK-18694
> URL: https://issues.apache.org/jira/browse/FLINK-18694
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Kboh
>Assignee: Kboh
>Priority: Trivial
>  Labels: pull-request-available
>
> h2. What is the purpose of the change
>  * Show in web ui if unaligned checkpoints are enabled.
> h2. Brief change log
>  * Adds unaligned checkpoint config to REST endpoint, and web ui.
>  
> [https://github.com/apache/flink/pull/12962]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-19012) E2E test fails with "Cannot register Closeable, this subtaskCheckpointCoordinator is already closed. Closing argument."

2020-08-28 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-19012.

Resolution: Fixed

Merged in master : 378115ffd1642c782c7d0f203c5c46e07d18fc23

> E2E test fails with "Cannot register Closeable, this 
> subtaskCheckpointCoordinator is already closed. Closing argument."
> ---
>
> Key: FLINK-19012
> URL: https://issues.apache.org/jira/browse/FLINK-19012
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Task, Tests
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> Note: This error occurred in a custom branch with unreviewed changes. I don't 
> believe my changes affect this error, but I would keep this in mind when 
> investigating the error: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8307=logs=1f3ed471-1849-5d3c-a34c-19792af4ad16=0d2e35fc-a330-5cf2-a012-7267e2667b1d
>  
> {code}
> 2020-08-20T20:55:30.2400645Z 2020-08-20 20:55:22,373 INFO  
> org.apache.flink.runtime.taskmanager.Task[] - Registering 
> task at network: Source: Sequence Source -> Flat Map -> Sink: Unnamed (1/1) 
> (cbc357ccb763df2852fee8c4fc7d55f2_0_0) [DEPLOYING].
> 2020-08-20T20:55:30.2402392Z 2020-08-20 20:55:22,401 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask  [] - No state 
> backend has been configured, using default (Memory / JobManager) 
> MemoryStateBackend (data in heap memory / checkpoints to JobManager) 
> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 
> 5242880)
> 2020-08-20T20:55:30.2404297Z 2020-08-20 20:55:22,413 INFO  
> org.apache.flink.runtime.taskmanager.Task[] - Source: 
> Sequence Source -> Flat Map -> Sink: Unnamed (1/1) 
> (cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to RUNNING.
> 2020-08-20T20:55:30.2405805Z 2020-08-20 20:55:22,786 INFO  
> org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge
>  [] - Pinging Elasticsearch cluster via hosts [http://127.0.0.1:9200] ...
> 2020-08-20T20:55:30.2407027Z 2020-08-20 20:55:22,848 INFO  
> org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge
>  [] - Elasticsearch RestHighLevelClient is connected to 
> [http://127.0.0.1:9200]
> 2020-08-20T20:55:30.2409277Z 2020-08-20 20:55:29,205 INFO  
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
>  [] - Source: Sequence Source -> Flat Map -> Sink: Unnamed (1/1) discarding 0 
> drained requests
> 2020-08-20T20:55:30.2410690Z 2020-08-20 20:55:29,218 INFO  
> org.apache.flink.runtime.taskmanager.Task[] - Source: 
> Sequence Source -> Flat Map -> Sink: Unnamed (1/1) 
> (cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FINISHED.
> 2020-08-20T20:55:30.2412187Z 2020-08-20 20:55:29,218 INFO  
> org.apache.flink.runtime.taskmanager.Task[] - Freeing 
> task resources for Source: Sequence Source -> Flat Map -> Sink: Unnamed (1/1) 
> (cbc357ccb763df2852fee8c4fc7d55f2_0_0).
> 2020-08-20T20:55:30.2414203Z 2020-08-20 20:55:29,224 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - 
> Un-registering task and sending final execution state FINISHED to JobManager 
> for task Source: Sequence Source -> Flat Map -> Sink: Unnamed (1/1) 
> cbc357ccb763df2852fee8c4fc7d55f2_0_0.
> 2020-08-20T20:55:30.2415602Z 2020-08-20 20:55:29,219 INFO  
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: 
> Sequence Source -> Flat Map -> Sink: Unnamed (1/1) - asynchronous part of 
> checkpoint 1 could not be completed.
> 2020-08-20T20:55:30.2416411Z java.io.UncheckedIOException: 
> java.io.IOException: Cannot register Closeable, this 
> subtaskCheckpointCoordinator is already closed. Closing argument.
> 2020-08-20T20:55:30.2418956Z  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.lambda$registerConsumer$2(SubtaskCheckpointCoordinatorImpl.java:468)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> 2020-08-20T20:55:30.2420100Z  at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:91)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> 2020-08-20T20:55:30.2420927Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_265]
> 2020-08-20T20:55:30.2421455Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_265]
> 2020-08-20T20:55:30.2421879Z  at 

[jira] [Closed] (FLINK-19003) Add micro-benchmark for unaligned checkpoints

2020-08-27 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-19003.

Resolution: Fixed

Merged in master: de049572690f4f744c9706dba5268e242f9e3bc8

> Add micro-benchmark for unaligned checkpoints
> -
>
> Key: FLINK-19003
> URL: https://issues.apache.org/jira/browse/FLINK-19003
> Project: Flink
>  Issue Type: Task
>  Components: Benchmarks, Runtime / Checkpointing
>Reporter: Zhijiang
>Assignee: Zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> It is necessary to supplement the unaligned checkpoint benchmark to verify 
> our following  improvements or any effect in future. 
> The benchmark should cover both remote and local channels separately for 
> different code paths, and it also needs to guarantee there are some in-flight 
> buffers during checkpoint for measuring the channel state snapshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers

2020-08-26 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184963#comment-17184963
 ] 

Zhijiang commented on FLINK-18695:
--

If I understood correctly, there seems three options to unblock netty version 
upgrade to support allocate heap memory:
* 1. NettyBufferPool#heapBuffer() to just take the internal implementation of 
`#directBuffer()` instead, as [~chesnay] mentioned above.
* 2. `#heapBuffer()` takes the original internal implementation and we need to 
assign the same amount of heap arenas as did for direct arenas.
* 3. `#heapBuffer()` allocates the temporary ByteBuf to bypass the netty 
internal memory management without heap arenas.

The second option might enlarge the total heap memory overhead because of 
introducing heap arenas, and the ssl engine seems only need less heap memory in 
practice. 

The third option might reduce the total heap memory overhead compared with 
second option, but it might be not friendly for GC. And it might bring 
potential risks in future if the heap memory allocation is frequent in other 
parts.

Therefore I am a bit preferring the first option since it can make use of 
exiting direct arenas to avoid extra heap memory overhead considered in 
framework. And after upgrading the netty version, the effect should be tiny 
even ignored because the previous direct memory usage caused by ssl has no 
changes and the new involved direct memory overhead(heap -> direct) seems 
really small by [~gaoyunhaii]'s testing results. 


> Allow NettyBufferPool to allocate heap buffers
> --
>
> Key: FLINK-18695
> URL: https://issues.apache.org/jira/browse/FLINK-18695
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.12.0
>
>
> in 4.1.43 netty made a change to their SslHandler to always use heap buffers 
> for JDK SSLEngine implementations, to avoid an additional memory copy.
> However, our {{NettyBufferPool}} forbids heap buffer allocations.
> We will either have to allow heap buffer allocations, or create a custom 
> SslHandler implementation that does not use heap buffers (although this seems 
> ill-adviced?).
> /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18955) Add snapshot path to job startup message

2020-08-24 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-18955:


Assignee: Yuan Mei

> Add snapshot path to job startup message
> 
>
> Key: FLINK-18955
> URL: https://issues.apache.org/jira/browse/FLINK-18955
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Nico Kruber
>Assignee: Yuan Mei
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> When a job is started from a checkpoint or savepoint (I'm using snapshot as 
> the unanimous term below), the {{CheckpointCoordinator}} prints a log line 
> like this:
> {code}
> 2020-08-13 13:50:51,418 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 220d8a4953cd40198b6eb3b1ec0cece0 from latest valid checkpoint: Checkpoint 
> 357 @ 1597326576925 for 220d8a4953cd40198b6eb3b1ec0cece0.
> {code}
> I propose to add the path to the snapshot to this message because which 
> snapshot is taken for restore may actually not be that obvious for the user: 
> even if a savepoint was specified in the job start command, e.g. in a 
> Kubernetes pod spec, an HA store could overrule the decision and take a more 
> recent snapshot instead. If that snapshot is a savepoint, it is not that easy 
> to map this to checkpoint IDs and find out which savepoint the job actually 
> started from.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19003) Add micro-benchmark for unaligned checkpoints

2020-08-19 Thread Zhijiang (Jira)
Zhijiang created FLINK-19003:


 Summary: Add micro-benchmark for unaligned checkpoints
 Key: FLINK-19003
 URL: https://issues.apache.org/jira/browse/FLINK-19003
 Project: Flink
  Issue Type: Task
  Components: Benchmarks, Runtime / Checkpointing
Reporter: Zhijiang
Assignee: Zhijiang


It is necessary to supplement the unaligned checkpoint benchmark to verify our 
following  improvements or any effect in future. 

The benchmark should cover both remote and local channels separately for 
different code paths, and it also needs to guarantee there are some in-flight 
buffers during checkpoint for measuring the channel state snapshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-13 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17176817#comment-17176817
 ] 

Zhijiang commented on FLINK-18832:
--

[~pnowojski] Agree with you proposal. It would be nice to give some friendly 
message for the current limitation. I would submit the PR for it.

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18821) Netty client retry mechanism may cause PartitionRequestClientFactory#createPartitionRequestClient to wait infinitely

2020-08-12 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17176163#comment-17176163
 ] 

Zhijiang edited comment on FLINK-18821 at 8/13/20, 3:46 AM:


Merged in release-1.10: e581d7f8abeac8951735afeb8d3b90977bf10c0a
Merged in release-1.11: ded539e85444f9c8d34f88da31784775ed281aba
Merged in master: 33bdc978a059c50ec55b8f24d40039d13a6b78e7


was (Author: zjwang):
Merged in release-1.11: ded539e85444f9c8d34f88da31784775ed281aba
Merged in master: 33bdc978a059c50ec55b8f24d40039d13a6b78e7

> Netty client retry mechanism may cause 
> PartitionRequestClientFactory#createPartitionRequestClient to wait infinitely
> 
>
> Key: FLINK-18821
> URL: https://issues.apache.org/jira/browse/FLINK-18821
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Caizhi Weng
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
> When running TPCDS 10T benchmark on Flink I found some of the task slots 
> stuck. After some investigation there seems to be a bug in 
> {{PartitionRequestClientFactory}}.
> When a task tries to require a partition of data from its upstream task but 
> fails, {{PartitionRequestClientFactory#connect}} will throw 
> {{RemoteTransportException}} and 
> {{PartitionRequestClientFactory#connectWithRetries}} will throw 
> {{CompletionException}}. However this exception is not caught by 
> {{PartitionRequestClientFactory#connect}} and it will eventually fail the 
> task.
> But {{PartitionRequestClientFactory}} lives in a task manager not in a task. 
> In {{PartitionRequestClientFactory}} a {{ConcurrentHashMap}} named 
> {{clients}} is maintained for reusing {{NettyPartitionRequestClient}}. When 
> the above exception happens, {{clients}} is not cleaned up; When the next 
> call to {{PartitionRequestClientFactory#connect}} with the same connection id 
> comes, it will use the invalid {{CompletableFuture}} in {{clients}} and this 
> future will never complete, causing the task to stuck forever.
> Exception stack:
> {code}
> 2020-08-05 03:37:07,539 ERROR 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory [] - 
> Failed 1 times to connect to /:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connecting to remote task manager '/:' has failed. This 
> might indicate that the remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:120)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:99)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:76)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:146)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:329)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:301)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:95)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.requestPartitions(StreamTask.java:514)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.readRecoveredChannelState(StreamTask.java:484)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:475)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> 

[jira] [Closed] (FLINK-18821) Netty client retry mechanism may cause PartitionRequestClientFactory#createPartitionRequestClient to wait infinitely

2020-08-12 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-18821.

Resolution: Fixed

Merged in release-1.11: ded539e85444f9c8d34f88da31784775ed281aba
Merged in master: 33bdc978a059c50ec55b8f24d40039d13a6b78e7

> Netty client retry mechanism may cause 
> PartitionRequestClientFactory#createPartitionRequestClient to wait infinitely
> 
>
> Key: FLINK-18821
> URL: https://issues.apache.org/jira/browse/FLINK-18821
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Caizhi Weng
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
> When running TPCDS 10T benchmark on Flink I found some of the task slots 
> stuck. After some investigation there seems to be a bug in 
> {{PartitionRequestClientFactory}}.
> When a task tries to require a partition of data from its upstream task but 
> fails, {{PartitionRequestClientFactory#connect}} will throw 
> {{RemoteTransportException}} and 
> {{PartitionRequestClientFactory#connectWithRetries}} will throw 
> {{CompletionException}}. However this exception is not caught by 
> {{PartitionRequestClientFactory#connect}} and it will eventually fail the 
> task.
> But {{PartitionRequestClientFactory}} lives in a task manager not in a task. 
> In {{PartitionRequestClientFactory}} a {{ConcurrentHashMap}} named 
> {{clients}} is maintained for reusing {{NettyPartitionRequestClient}}. When 
> the above exception happens, {{clients}} is not cleaned up; When the next 
> call to {{PartitionRequestClientFactory#connect}} with the same connection id 
> comes, it will use the invalid {{CompletableFuture}} in {{clients}} and this 
> future will never complete, causing the task to stuck forever.
> Exception stack:
> {code}
> 2020-08-05 03:37:07,539 ERROR 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory [] - 
> Failed 1 times to connect to /:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connecting to remote task manager '/:' has failed. This 
> might indicate that the remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:120)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:99)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:76)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:146)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:329)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:301)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:95)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.requestPartitions(StreamTask.java:514)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.readRecoveredChannelState(StreamTask.java:484)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:475)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>  

[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-11 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17175960#comment-17175960
 ] 

Zhijiang commented on FLINK-18832:
--

Thanks for the explanation [~dwysakowicz]. 

If so, I think we can bypass this issue if in future we want to unify the 
DataStream API to simulate batch job with explicitly setBufferTimeout(-1). ATM, 
the blink planner and BatchTask from DataSet API already explicitly 
setBufferTimeout(-1) as well.

If nobody has other concerns, I will close this ticket now. And if we want to 
support buffer timeout for batch job in future, we can focus on it then.

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18796) FlinkKinesisProducer.backpressureLatch should be volatile

2020-08-06 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-18796.

Resolution: Fixed

> FlinkKinesisProducer.backpressureLatch should be volatile
> -
>
> Key: FLINK-18796
> URL: https://issues.apache.org/jira/browse/FLINK-18796
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.11.1
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> (confirm first)
>  
> cc: [~rmetzger]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18796) FlinkKinesisProducer.backpressureLatch should be volatile

2020-08-06 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17172778#comment-17172778
 ] 

Zhijiang commented on FLINK-18796:
--

Merged in master: 33bdc978a059c50ec55b8f24d40039d13a6b78e7

> FlinkKinesisProducer.backpressureLatch should be volatile
> -
>
> Key: FLINK-18796
> URL: https://issues.apache.org/jira/browse/FLINK-18796
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.11.1
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> (confirm first)
>  
> cc: [~rmetzger]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-05 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-18832:


Assignee: Zhijiang

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-05 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171894#comment-17171894
 ] 

Zhijiang edited comment on FLINK-18832 at 8/6/20, 4:15 AM:
---

Thanks for reporting this bug [~trohrmann]!

By design I guess the BoundedBlockingSubpartition assumes no concurrent issue 
for `#flushCurrentBuffer` method. But actually the task thread and flusher 
thread can touch this method concurrently. I can think of some options for 
resolving it:

* Disable flusher thread for batch jobs, because it has no benefits for latency 
concern as the downstream will only request partition after upstream finishes 
based on current schedule way. Even it would bring harm for upstream writer to 
spill partial buffer after flush triggered.
* From long term goal, the flusher thread should be delegated by mailbox model, 
so we can avoid concurrent issue even if the flusher timeout valid for batch 
jobs.
* Breaks the previous assumption to allow concurrent access of  
`BoundedBlockingSubpartition#flushCurrentBuffer`.

If we can realize the second option soon, then we can bypass this bug. I 
remembered [~pnowojski]already submitted the PR for it before, but have not 
merged yet. If this way can not be realized in short time, then i prefer the 
first option to work around. WDYT?


was (Author: zjwang):
Thanks for reporting this bug [~trohrmann]!

By design I guess the BoundedBlockingSubpartition assumes no concurrent issue 
for `#flushCurrentBuffer` method. But actually the task thread and flusher 
thread can touch this method concurrently. I can thought of some options for 
resolving it:

* Disable flusher thread for batch jobs, because it has no benefits for latency 
concern as the downstream will only request partition after upstream finishes 
based on current schedule way. Even it would bring harm for upstream writer to 
spill partial buffer after flush triggered.
* From long term goal, the flusher thread should be delegated by mailbox model, 
so we can avoid concurrent issue even if the flusher timeout valid for batch 
jobs.
* Breaks the previous assumption to allow concurrent access of  
`BoundedBlockingSubpartition#flushCurrentBuffer`.

If we can realize the second option soon, then we can bypass this bug. I 
remembered [~pnowojski]already submitted the PR for it before, but have not 
merged yet. If this way can not be realized in short time, then i prefer the 
first option to work around. WDYT?

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> 

[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-05 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171894#comment-17171894
 ] 

Zhijiang commented on FLINK-18832:
--

Thanks for reporting this bug [~trohrmann]!

By design I guess the BoundedBlockingSubpartition assumes no concurrent issue 
for `#flushCurrentBuffer` method. But actually the task thread and flusher 
thread can touch this method concurrently. I can thought of some options for 
resolving it:

* Disable flusher thread for batch jobs, because it has no benefits for latency 
concern as the downstream will only request partition after upstream finishes 
based on current schedule way. Even it would bring harm for upstream writer to 
spill partial buffer after flush triggered.
* From long term goal, the flusher thread should be delegated by mailbox model, 
so we can avoid concurrent issue even if the flusher timeout valid for batch 
jobs.
* Breaks the previous assumption to allow concurrent access of  
`BoundedBlockingSubpartition#flushCurrentBuffer`.

If we can realize the second option soon, then we can bypass this bug. I 
remembered [~pnowojski]already submitted the PR for it before, but have not 
merged yet. If this way can not be realized in short time, then i prefer the 
first option to work around. WDYT?

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18821) Netty client retry mechanism may cause PartitionRequestClientFactory#createPartitionRequestClient to wait infinitely

2020-08-05 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-18821:


Assignee: Roman Khachatryan  (was: Roman Kishchenko)

> Netty client retry mechanism may cause 
> PartitionRequestClientFactory#createPartitionRequestClient to wait infinitely
> 
>
> Key: FLINK-18821
> URL: https://issues.apache.org/jira/browse/FLINK-18821
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.12.0
>Reporter: Caizhi Weng
>Assignee: Roman Khachatryan
>Priority: Major
>
> When running TPCDS 10T benchmark on Flink I found some of the task slots 
> stuck. After some investigation there seems to be a bug in 
> {{PartitionRequestClientFactory}}.
> When a task tries to require a partition of data from its upstream task but 
> fails, {{PartitionRequestClientFactory#connect}} will throw 
> {{RemoteTransportException}} and 
> {{PartitionRequestClientFactory#connectWithRetries}} will throw 
> {{CompletionException}}. However this exception is not caught by 
> {{PartitionRequestClientFactory#connect}} and it will eventually fail the 
> task.
> But {{PartitionRequestClientFactory}} lives in a task manager not in a task. 
> In {{PartitionRequestClientFactory}} a {{ConcurrentHashMap}} named 
> {{clients}} is maintained for reusing {{NettyPartitionRequestClient}}. When 
> the above exception happens, {{clients}} is not cleaned up; When the next 
> call to {{PartitionRequestClientFactory#connect}} with the same connection id 
> comes, it will use the invalid {{CompletableFuture}} in {{clients}} and this 
> future will never complete, causing the task to stuck forever.
> Exception stack:
> {code}
> 2020-08-05 03:37:07,539 ERROR 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory [] - 
> Failed 1 times to connect to /:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connecting to remote task manager '/:' has failed. This 
> might indicate that the remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:120)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:99)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:76)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:146)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:329)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:301)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:95)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.requestPartitions(StreamTask.java:514)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.readRecoveredChannelState(StreamTask.java:484)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:475)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 

[jira] [Assigned] (FLINK-18821) Netty client retry mechanism may cause PartitionRequestClientFactory#createPartitionRequestClient to wait infinitely

2020-08-05 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-18821:


Assignee: Roman Kishchenko

> Netty client retry mechanism may cause 
> PartitionRequestClientFactory#createPartitionRequestClient to wait infinitely
> 
>
> Key: FLINK-18821
> URL: https://issues.apache.org/jira/browse/FLINK-18821
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.12.0
>Reporter: Caizhi Weng
>Assignee: Roman Kishchenko
>Priority: Major
>
> When running TPCDS 10T benchmark on Flink I found some of the task slots 
> stuck. After some investigation there seems to be a bug in 
> {{PartitionRequestClientFactory}}.
> When a task tries to require a partition of data from its upstream task but 
> fails, {{PartitionRequestClientFactory#connect}} will throw 
> {{RemoteTransportException}} and 
> {{PartitionRequestClientFactory#connectWithRetries}} will throw 
> {{CompletionException}}. However this exception is not caught by 
> {{PartitionRequestClientFactory#connect}} and it will eventually fail the 
> task.
> But {{PartitionRequestClientFactory}} lives in a task manager not in a task. 
> In {{PartitionRequestClientFactory}} a {{ConcurrentHashMap}} named 
> {{clients}} is maintained for reusing {{NettyPartitionRequestClient}}. When 
> the above exception happens, {{clients}} is not cleaned up; When the next 
> call to {{PartitionRequestClientFactory#connect}} with the same connection id 
> comes, it will use the invalid {{CompletableFuture}} in {{clients}} and this 
> future will never complete, causing the task to stuck forever.
> Exception stack:
> {code}
> 2020-08-05 03:37:07,539 ERROR 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory [] - 
> Failed 1 times to connect to /:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connecting to remote task manager '/:' has failed. This 
> might indicate that the remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:120)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:99)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:76)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:146)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:329)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:301)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:95)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.requestPartitions(StreamTask.java:514)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.readRecoveredChannelState(StreamTask.java:484)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:475)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> 

[jira] [Closed] (FLINK-18767) Streaming job stuck when disabling operator chaining

2020-08-03 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang closed FLINK-18767.

Resolution: Not A Problem

> Streaming job stuck when disabling operator chaining
> 
>
> Key: FLINK-18767
> URL: https://issues.apache.org/jira/browse/FLINK-18767
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.3, 1.10.1, 1.11.1
>Reporter: Nico Kruber
>Assignee: Zhijiang
>Priority: Critical
>
> The following code is stuck sending data from the source to the map operator. 
> Two settings seem to have an influence here: {{env.setBufferTimeout(-1);}} 
> and {{env.disableOperatorChaining();}} - if I remove either of these, the job 
> works as expected.
> (I pre-populated my Kafka topic with one element to reproduce easily)
> {code}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // comment either these two and the job works
> env.setBufferTimeout(-1);
> env.disableOperatorChaining(); 
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "test");
> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic", 
> new SimpleStringSchema(), properties);
> consumer.setStartFromEarliest();
> DataStreamSource input = env.addSource(consumer);
> input
> .map((x) -> x)
> .print();
> env.execute();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18767) Streaming job stuck when disabling operator chaining

2020-08-03 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169904#comment-17169904
 ] 

Zhijiang commented on FLINK-18767:
--

After confirming with [~NicoK] offline, it had some misleading before and not a 
real problem, so close it.

> Streaming job stuck when disabling operator chaining
> 
>
> Key: FLINK-18767
> URL: https://issues.apache.org/jira/browse/FLINK-18767
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.3, 1.10.1, 1.11.1
>Reporter: Nico Kruber
>Assignee: Zhijiang
>Priority: Critical
>
> The following code is stuck sending data from the source to the map operator. 
> Two settings seem to have an influence here: {{env.setBufferTimeout(-1);}} 
> and {{env.disableOperatorChaining();}} - if I remove either of these, the job 
> works as expected.
> (I pre-populated my Kafka topic with one element to reproduce easily)
> {code}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // comment either these two and the job works
> env.setBufferTimeout(-1);
> env.disableOperatorChaining(); 
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "test");
> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic", 
> new SimpleStringSchema(), properties);
> consumer.setStartFromEarliest();
> DataStreamSource input = env.addSource(consumer);
> input
> .map((x) -> x)
> .print();
> env.execute();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18767) Streaming job stuck when disabling operator chaining

2020-08-02 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-18767:
-
Affects Version/s: (was: 1.8.3)

> Streaming job stuck when disabling operator chaining
> 
>
> Key: FLINK-18767
> URL: https://issues.apache.org/jira/browse/FLINK-18767
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.3, 1.10.1, 1.11.1
>Reporter: Nico Kruber
>Assignee: Zhijiang
>Priority: Critical
>
> The following code is stuck sending data from the source to the map operator. 
> Two settings seem to have an influence here: {{env.setBufferTimeout(-1);}} 
> and {{env.disableOperatorChaining();}} - if I remove either of these, the job 
> works as expected.
> (I pre-populated my Kafka topic with one element to reproduce easily)
> {code}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // comment either these two and the job works
> env.setBufferTimeout(-1);
> env.disableOperatorChaining(); 
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "test");
> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic", 
> new SimpleStringSchema(), properties);
> consumer.setStartFromEarliest();
> DataStreamSource input = env.addSource(consumer);
> input
> .map((x) -> x)
> .print();
> env.execute();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18767) Streaming job stuck when disabling operator chaining

2020-07-31 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-18767:


Assignee: Zhijiang

> Streaming job stuck when disabling operator chaining
> 
>
> Key: FLINK-18767
> URL: https://issues.apache.org/jira/browse/FLINK-18767
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.8.3, 1.9.3, 1.10.1, 1.11.1
>Reporter: Nico Kruber
>Assignee: Zhijiang
>Priority: Critical
>
> The following code is stuck sending data from the source to the map operator. 
> Two settings seem to have an influence here: {{env.setBufferTimeout(-1);}} 
> and {{env.disableOperatorChaining();}} - if I remove either of these, the job 
> works as expected.
> (I pre-populated my Kafka topic with one element to reproduce easily)
> {code}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // comment either these two and the job works
> env.setBufferTimeout(-1);
> env.disableOperatorChaining(); 
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "test");
> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic", 
> new SimpleStringSchema(), properties);
> consumer.setStartFromEarliest();
> DataStreamSource input = env.addSource(consumer);
> input
> .map((x) -> x)
> .print();
> env.execute();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers

2020-07-30 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168391#comment-17168391
 ] 

Zhijiang edited comment on FLINK-18695 at 7/31/20, 4:40 AM:


{quote}
How problematic would it be to backport this to older versions? (1.10, maybe 
even 1.9?)
{quote}

I think it is up to the experiments results? If we convince the heap memory 
overhead after upgrading is trivial, then it is simple to backport to older 
versions. Otherwise we need estimate the other dependencies. 

I already synced with [~gaoyunhaii] and he is willing to help handle it. Since 
he is on vacation now, probably he can work on  after next week. I assign this 
ticket to him.


was (Author: zjwang):
{quot}
How problematic would it be to backport this to older versions? (1.10, maybe 
even 1.9?)
{quot}

I think it is up to the experiments results? If we convince the heap memory 
overhead after upgrading is trivial, then it is simple to backport to older 
versions. Otherwise we need estimate the other dependencies. 

I already synced with [~gaoyunhaii] and he is willing to help handle it. Since 
he is on vacation now, probably he can work on  after next week. I assign this 
ticket to him.

> Allow NettyBufferPool to allocate heap buffers
> --
>
> Key: FLINK-18695
> URL: https://issues.apache.org/jira/browse/FLINK-18695
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.12.0
>
>
> in 4.1.43 netty made a change to their SslHandler to always use heap buffers 
> for JDK SSLEngine implementations, to avoid an additional memory copy.
> However, our {{NettyBufferPool}} forbids heap buffer allocations.
> We will either have to allow heap buffer allocations, or create a custom 
> SslHandler implementation that does not use heap buffers (although this seems 
> ill-adviced?).
> /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers

2020-07-30 Thread Zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang reassigned FLINK-18695:


Assignee: Yun Gao

> Allow NettyBufferPool to allocate heap buffers
> --
>
> Key: FLINK-18695
> URL: https://issues.apache.org/jira/browse/FLINK-18695
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.12.0
>
>
> in 4.1.43 netty made a change to their SslHandler to always use heap buffers 
> for JDK SSLEngine implementations, to avoid an additional memory copy.
> However, our {{NettyBufferPool}} forbids heap buffer allocations.
> We will either have to allow heap buffer allocations, or create a custom 
> SslHandler implementation that does not use heap buffers (although this seems 
> ill-adviced?).
> /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers

2020-07-30 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168391#comment-17168391
 ] 

Zhijiang commented on FLINK-18695:
--

{quot}
How problematic would it be to backport this to older versions? (1.10, maybe 
even 1.9?)
{quot}

I think it is up to the experiments results? If we convince the heap memory 
overhead after upgrading is trivial, then it is simple to backport to older 
versions. Otherwise we need estimate the other dependencies. 

I already synced with [~gaoyunhaii] and he is willing to help handle it. Since 
he is on vacation now, probably he can work on  after next week. I assign this 
ticket to him.

> Allow NettyBufferPool to allocate heap buffers
> --
>
> Key: FLINK-18695
> URL: https://issues.apache.org/jira/browse/FLINK-18695
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.12.0
>
>
> in 4.1.43 netty made a change to their SslHandler to always use heap buffers 
> for JDK SSLEngine implementations, to avoid an additional memory copy.
> However, our {{NettyBufferPool}} forbids heap buffer allocations.
> We will either have to allow heap buffer allocations, or create a custom 
> SslHandler implementation that does not use heap buffers (although this seems 
> ill-adviced?).
> /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers

2020-07-30 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167650#comment-17167650
 ] 

Zhijiang edited comment on FLINK-18695 at 7/30/20, 5:59 AM:


{quote}
Do we actually also cover the receiving side and use our own memory segments 
deep in the Netty receiver stack?
{quote}
Yes, we resolved it in release-1.11 by 
[FLINK-10742|https://issues.apache.org/jira/browse/FLINK-10742], [~NicoK]

{quote}
Allowing netty to use heap buffers is quite easy (just remove the overridden 
methods in or NettyBufferPool, but what other changes would be required? Would 
we also have to increase the number of heap arenas (from 0)?
{quote}
Yes, we should adjust the number of heap arenas as the same with direct arenas, 
because the heap buffer allocation relies on the heap arena.

{quote}
How difficult is it adjust the memory model to account for this?
{quote}

After further checking the codes, I find that our previous improvement of netty 
memory in FLINK-10742 did not cover the case of SSL.  The pipeline handler in 
receiver side is SslHandler -> NettyMessageDecoder. The FLINK-10742 only 
reduced the netty memory overhead in message decoder, but the SslHandler still 
allocates netty memory via internal netty allocator. As we never used the SSL 
mode in our production before, so not quite sure the effect of this portion 
memory overhead in practice. 

Another concern is that since we config separate heap arenas for SSL, then the 
total memory overhead might be enlarged compared with reusing the direct arena 
as before. Maybe it need some framework heap memory supplementing for this 
overhead if upgrading. Anyway I think it would be safer to make some 
experiments to get some insights if possible before upgrading directly. 
[~chesnay]


was (Author: zjwang):
{quote}
Do we actually also cover the receiving side and use our own memory segments 
deep in the Netty receiver stack?
{quote}
Yes, we resolved it in release-1.11 by 
[FLINK-10742|https://issues.apache.org/jira/browse/FLINK-10742], [~NicoK]

{quote}
Allowing netty to use heap buffers is quite easy (just remove the overridden 
methods in or NettyBufferPool, but what other changes would be required? Would 
we also have to increase the number of heap arenas (from 0)?
{quote}
Yes, we should adjust the number of heap arenas as the same with direct arenas, 
because the heap buffer allocation relies on the heap arena.

{quote}
How difficult is it adjust the memory model to account for this?
{quote}

After further checking the codes, I find that our previous improvement of netty 
memory in FLINK-10742 did not cover the case of SSL.  The pipeline handler in 
receiver side is SslHandler -> NettyMessageDecoder. The FLINK-10742 only 
reduced the netty memory overhead in message coder, but the SslHandler still 
allocates memory via internal netty allocator. As we never used the SSL mode in 
our production before, so not quite sure the effect of this portion memory 
overhead in practice. 

Another concern is that since we config separate heap arenas for SSL, then the 
total memory overhead might be enlarged compared with reusing the direct arena 
as before. Maybe it need some framework heap memory supplementing for this 
overhead if upgrading. Anyway I think it would be safer to make some 
experiments to get some insights if possible before upgrading directly. 
[~chesnay]

> Allow NettyBufferPool to allocate heap buffers
> --
>
> Key: FLINK-18695
> URL: https://issues.apache.org/jira/browse/FLINK-18695
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.12.0
>
>
> in 4.1.43 netty made a change to their SslHandler to always use heap buffers 
> for JDK SSLEngine implementations, to avoid an additional memory copy.
> However, our {{NettyBufferPool}} forbids heap buffer allocations.
> We will either have to allow heap buffer allocations, or create a custom 
> SslHandler implementation that does not use heap buffers (although this seems 
> ill-adviced?).
> /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   6   7   8   9   10   >