[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2023-12-25 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33934:

Description: 
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue (our internal mq) and 
write to hive (only select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
    3. For RawFormatDeserializationSchema, its deserialize function will return 
the same object([reuse rowData 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
    4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 
RawFormatDeserializationSchema::deserialize returned, this will lead to data 
lost;
 
The reason that we select value and metadata field at the same time will not 
encounter data lost is:
   if we select metadata field there will return a new RowData object see code: 
[DynamicKafkaDeserializationSchema deserialize with metadata field 
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
 and if we only select value filed, it will reuse the RowData object that 
formatDeserializationSchema returned see code 
[DynamicKafkaDeserializationSchema deserialize only with value 
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
 
To solve this problem, i think we should remove reuse object of 
RawFormatDeserializationSchema.

  was:
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
    3. For RawFormatDeserializationSchema, its deserialize function will return 
the same object([reuse rowData 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
    4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the 

[jira] [Comment Edited] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2023-12-25 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17800273#comment-17800273
 ] 

Cai Liuyang edited comment on FLINK-33934 at 12/25/23 11:41 AM:


[~hackergin] Yeah,  turn off Object Reuse for all DeserializeSchema(that will 
be used by SourceOperator) is the simple way to avoid this data lost problem.

-- update --

after review kafka source code, kafka will deserialize from ConsumerRecord to 
real object in SourceOperator MainThread, So it will not encounter this 
problem, see code: [KafkaRecordEmitter 
code|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L53C43-L53C43]
 

 

So, this problem will only appear in which case that 
DeserializeSchema::deserialize is called in FetcherThread

 


was (Author: cailiuyang):
[~hackergin] Yeah,  turn off Object Reuse for all DeserializeSchema(that will 
be used by SourceOperator) is the simple way to avoid this data lost problem.

> Flink SQL Source use raw format maybe lead to data lost
> ---
>
> Key: FLINK-33934
> URL: https://issues.apache.org/jira/browse/FLINK-33934
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Runtime
>Reporter: Cai Liuyang
>Priority: Major
>
> In our product we encounter a case that lead to data lost, the job info: 
>    1. using flinkSQL that read data from messageQueue and write to hive (only 
> select value field, doesn't contain metadata field)
>    2. the format of source table is raw format
>  
> But if we select value field and metadata field at the same time, than the 
> data lost will not appear
>  
> After we review the code, we found that the reason is the object reuse of 
> Raw-format(see code 
> [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
>  why object reuse will lead to this problem is below (take kafka as example):
>     1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
> SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
> partition, than put data to ElementQueue (see code [SourceOperator 
> FetcherTask 
> |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
>     2. SourceOperator's main thread will pull data from the 
> ElementQueue(which is shared with the FetcherThread) and process it (see code 
> [SourceOperator main 
> thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
>     3. For RawFormatDeserializationSchema, its deserialize function will 
> return the same object([reuse rowData 
> object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
>     4. So, if elementQueue have element that not be consumed, than the 
> fetcherThread can change the filed of the reused rawData that 
> RawFormatDeserializationSchema::deserialize returned, this will lead to data 
> lost;
>  
> The reason that we select value and metadata field at the same time will not 
> encounter data lost is:
>    if we select metadata field there will return a new RowData object see 
> code: [DynamicKafkaDeserializationSchema deserialize with metadata field 
> |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
>  and if we only select value filed, it will reuse the RowData object that 
> formatDeserializationSchema returned see code 
> [DynamicKafkaDeserializationSchema deserialize only with value 
> field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
>  
> To solve this problem, i think we should remove reuse object of 
> RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2023-12-25 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17800273#comment-17800273
 ] 

Cai Liuyang commented on FLINK-33934:
-

[~hackergin] Yeah,  turn off Object Reuse for all DeserializeSchema(that will 
be used by SourceOperator) is the simple way to avoid this data lost problem.

> Flink SQL Source use raw format maybe lead to data lost
> ---
>
> Key: FLINK-33934
> URL: https://issues.apache.org/jira/browse/FLINK-33934
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Runtime
>Reporter: Cai Liuyang
>Priority: Major
>
> In our product we encounter a case that lead to data lost, the job info: 
>    1. using flinkSQL that read data from messageQueue and write to hive (only 
> select value field, doesn't contain metadata field)
>    2. the format of source table is raw format
>  
> But if we select value field and metadata field at the same time, than the 
> data lost will not appear
>  
> After we review the code, we found that the reason is the object reuse of 
> Raw-format(see code 
> [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
>  why object reuse will lead to this problem is below (take kafka as example):
>     1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
> SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
> partition, than put data to ElementQueue (see code [SourceOperator 
> FetcherTask 
> |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
>     2. SourceOperator's main thread will pull data from the 
> ElementQueue(which is shared with the FetcherThread) and process it (see code 
> [SourceOperator main 
> thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
>     3. For RawFormatDeserializationSchema, its deserialize function will 
> return the same object([reuse rowData 
> object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
>     4. So, if elementQueue have element that not be consumed, than the 
> fetcherThread can change the filed of the reused rawData that 
> RawFormatDeserializationSchema::deserialize returned, this will lead to data 
> lost;
>  
> The reason that we select value and metadata field at the same time will not 
> encounter data lost is:
>    if we select metadata field there will return a new RowData object see 
> code: [DynamicKafkaDeserializationSchema deserialize with metadata field 
> |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
>  and if we only select value filed, it will reuse the RowData object that 
> formatDeserializationSchema returned see code 
> [DynamicKafkaDeserializationSchema deserialize only with value 
> field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
>  
> To solve this problem, i think we should remove reuse object of 
> RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2023-12-24 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33934:

Description: 
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
    3. For RawFormatDeserializationSchema, its deserialize function will return 
the same object([reuse rowData 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
    4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 
RawFormatDeserializationSchema::deserialize returned, this will lead to data 
lost;
 
The reason that we select value and metadata field at the same time will not 
encounter data lost is:
   if we select metadata field there will return a new RowData object see code: 
[DynamicKafkaDeserializationSchema deserialize with metadata field 
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
 and if we only select value filed, it will reuse the RowData object that 
formatDeserializationSchema returned see code 
[DynamicKafkaDeserializationSchema deserialize only with value 
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
 
To solve this problem, i think we should diable reuse object of 
RawFormatDeserializationSchema.

  was:
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
    3. For RawFormatDeserializationSchema, its deserialize function will return 
the same object([reuse 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
    4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 

[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2023-12-24 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33934:

Description: 
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
    3. For RawFormatDeserializationSchema, its deserialize function will return 
the same object([reuse rowData 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
    4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 
RawFormatDeserializationSchema::deserialize returned, this will lead to data 
lost;
 
The reason that we select value and metadata field at the same time will not 
encounter data lost is:
   if we select metadata field there will return a new RowData object see code: 
[DynamicKafkaDeserializationSchema deserialize with metadata field 
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
 and if we only select value filed, it will reuse the RowData object that 
formatDeserializationSchema returned see code 
[DynamicKafkaDeserializationSchema deserialize only with value 
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
 
To solve this problem, i think we should remove reuse object of 
RawFormatDeserializationSchema.

  was:
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
    3. For RawFormatDeserializationSchema, its deserialize function will return 
the same object([reuse rowData 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
    4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 

[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2023-12-24 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33934:

Description: 
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
3. For RawFormatDeserializationSchema, its deserialize function will return the 
same object([reuse 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 
RawFormatDeserializationSchema::deserialize returned, this will lead to data 
lost;
 
The reason that we select value and metadata field at the same time will not 
encounter data lost is:
   if we select metadata field there will return a new RowData object see code: 
[DynamicKafkaDeserializationSchema deserialize with metadata field 
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
 and if we only select value filed, it will reuse the RowData object that 
formatDeserializationSchema returned see code 
[DynamicKafkaDeserializationSchema deserialize only with value 
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
 
To solve this problem, i think we should diable reuse object of 
RawFormatDeserializationSchema.

  was:
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be use in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
3. For RawFormatDeserializationSchema, its deserialize function will return the 
same object([reuse 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 

[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2023-12-24 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33934:

Description: 
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
    3. For RawFormatDeserializationSchema, its deserialize function will return 
the same object([reuse 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
    4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 
RawFormatDeserializationSchema::deserialize returned, this will lead to data 
lost;
 
The reason that we select value and metadata field at the same time will not 
encounter data lost is:
   if we select metadata field there will return a new RowData object see code: 
[DynamicKafkaDeserializationSchema deserialize with metadata field 
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
 and if we only select value filed, it will reuse the RowData object that 
formatDeserializationSchema returned see code 
[DynamicKafkaDeserializationSchema deserialize only with value 
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
 
To solve this problem, i think we should diable reuse object of 
RawFormatDeserializationSchema.

  was:
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
3. For RawFormatDeserializationSchema, its deserialize function will return the 
same object([reuse 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 

[jira] [Created] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2023-12-24 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-33934:
---

 Summary: Flink SQL Source use raw format maybe lead to data lost
 Key: FLINK-33934
 URL: https://issues.apache.org/jira/browse/FLINK-33934
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / Runtime
Reporter: Cai Liuyang


In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be use in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
3. For RawFormatDeserializationSchema, its deserialize function will return the 
same object([reuse 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 
RawFormatDeserializationSchema::deserialize returned, this will lead to data 
lost;
 
The reason that we select value and metadata field at the same time will not 
encounter data lost is:
   if we select metadata field there will return a new RowData object see code: 
[DynamicKafkaDeserializationSchema deserialize with metadata field 
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
 and if we only select value filed, it will reuse the RowData object that 
formatDeserializationSchema returned see code 
[DynamicKafkaDeserializationSchema deserialize only with value 
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
 
To solve this problem, i think we should diable reuse object of 
RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33759) flink parquet writer support write nested array or map type

2023-12-05 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33759:

Description: 
When we use flink-parquet format wirte Map[] type (which will 
be read by spark job), we encounter an exception:
{code:java}
// code placeholder
Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are 
illegal, the field should be ommited completely instead
    at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.writeArrayData(ParquetRowDataWriter.java:438)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:419)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:471)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:81)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:89){code}
after review the code, we found flink-parquet doesn't support write nested 
array or map, because 
[[ArrayWriter|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437]|#L437]
 and 
[MapWriter|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391]
 doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function.

  was:
When we use flink-parquet format wirte Map[] type (which will 
be read by spark job), we encounter an exception:
{code:java}
// code placeholder
Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are 
illegal, the field should be ommited completely instead
    at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.writeArrayData(ParquetRowDataWriter.java:438)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:419)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:471)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:81)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:89){code}
after review the code, we found flink-parquet doesn't support write nested 
array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl 
`public void write(ArrayData arrayData, int ordinal) {}` function.


> flink parquet writer support write nested array or map type
> ---
>
> Key: FLINK-33759
> URL: https://issues.apache.org/jira/browse/FLINK-33759
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Cai Liuyang
>Priority: Major
>
> When we use flink-parquet format wirte Map[] type (which will 
> be read by spark job), we encounter an exception:
> {code:java}
> // code placeholder
> Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are 
> illegal, the field should be ommited completely instead
>     at 
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329)
>     at 
> org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.writeArrayData(ParquetRowDataWriter.java:438)
>     at 
> org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:419)
>     at 
> org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:471)
>     at 
> org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:81)
>     at 
> org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:89){code}
> after review the code, we found flink-parquet doesn't support write nested 
> array or map, because 
> [[ArrayWriter|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437]|#L437]
>  and 
> [MapWriter|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391]
>  doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` 
> function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33759) flink parquet writer support write nested array or map type

2023-12-05 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33759:

Description: 
When we use flink-parquet format wirte Map[] type (which will 
be read by spark job), we encounter an exception:
{code:java}
// code placeholder
Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are 
illegal, the field should be ommited completely instead
    at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.writeArrayData(ParquetRowDataWriter.java:438)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:419)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:471)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:81)
    at 
org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:89){code}
after review the code, we found flink-parquet doesn't support write nested 
array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl 
`public void write(ArrayData arrayData, int ordinal) {}` function.

  was:
When we use flink-parquet format wirte Map[] type (which will 
be read by spark job), we encounter an exception:
{code:java}
// code placeholder
org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead {code}
after review the code, we found flink-parquet doesn't support write nested 
array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl 
```public void write(ArrayData arrayData, int ordinal) {}``` function.


> flink parquet writer support write nested array or map type
> ---
>
> Key: FLINK-33759
> URL: https://issues.apache.org/jira/browse/FLINK-33759
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Cai Liuyang
>Priority: Major
>
> When we use flink-parquet format wirte Map[] type (which will 
> be read by spark job), we encounter an exception:
> {code:java}
> // code placeholder
> Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are 
> illegal, the field should be ommited completely instead
>     at 
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329)
>     at 
> org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.writeArrayData(ParquetRowDataWriter.java:438)
>     at 
> org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:419)
>     at 
> org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:471)
>     at 
> org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:81)
>     at 
> org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:89){code}
> after review the code, we found flink-parquet doesn't support write nested 
> array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl 
> `public void write(ArrayData arrayData, int ordinal) {}` function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33759) flink parquet writer support write nested array or map type

2023-12-05 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33759:

Description: 
When we use flink-parquet format wirte Map[] type (which will 
be read by spark job), we encounter an exception:
{code:java}
// code placeholder
org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead {code}
after review the code, we found flink-parquet doesn't support write nested 
array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl 
```public void write(ArrayData arrayData, int ordinal) {}``` function.

  was:
When we use flink-parquet format wirte Map[] type (which will 
be read by spark job), we encounter an exception:
{code:java}
// code placeholder
org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead {code}
org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead

``, after review the code, we found flink-parquet doesn't support write nested 
array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl 
`public void write(ArrayData arrayData, int ordinal) {}` function.


> flink parquet writer support write nested array or map type
> ---
>
> Key: FLINK-33759
> URL: https://issues.apache.org/jira/browse/FLINK-33759
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Cai Liuyang
>Priority: Major
>
> When we use flink-parquet format wirte Map[] type (which will 
> be read by spark job), we encounter an exception:
> {code:java}
> // code placeholder
> org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
> field should be ommited completely instead {code}
> after review the code, we found flink-parquet doesn't support write nested 
> array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl 
> ```public void write(ArrayData arrayData, int ordinal) {}``` function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33759) flink parquet writer support write nested array or map type

2023-12-05 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33759:

Description: When we use flink-parquet format wirte Map[] 
type (which will be read by spark job), we encounter an exception: 
`org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead`, after review the code, we found 
flink-parquet doesn't support write nested array or map, because 
[ArrayWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)]]
 and 
[MapWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)]]
 doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` 
function.  (was: When we use flink-parquet format wirte Map[] 
type (which will be read by spark job), we encounter an exception: 
`org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead`, after review the code, we found 
flink-parquet doesn't support write nested array or map, because 
[ArrayWriter]([https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)]
 and 
[MapWriter]([https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)]
 doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` 
function.)

> flink parquet writer support write nested array or map type
> ---
>
> Key: FLINK-33759
> URL: https://issues.apache.org/jira/browse/FLINK-33759
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Cai Liuyang
>Priority: Major
>
> When we use flink-parquet format wirte Map[] type (which will 
> be read by spark job), we encounter an exception: 
> `org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, 
> the field should be ommited completely instead`, after review the code, we 
> found flink-parquet doesn't support write nested array or map, because 
> [ArrayWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)]]
>  and 
> [MapWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)]]
>  doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` 
> function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33759) flink parquet writer support write nested array or map type

2023-12-05 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33759:

Description: 
When we use flink-parquet format wirte Map[] type (which will 
be read by spark job), we encounter an exception:
{code:java}
// code placeholder
org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead {code}
org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead

``, after review the code, we found flink-parquet doesn't support write nested 
array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl 
`public void write(ArrayData arrayData, int ordinal) {}` function.

  was:When we use flink-parquet format wirte Map[] type (which 
will be read by spark job), we encounter an exception: 
`org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead`, after review the code, we found 
flink-parquet doesn't support write nested array or map, because 
[ArrayWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)]]
 and 
[MapWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)]]
 doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function.


> flink parquet writer support write nested array or map type
> ---
>
> Key: FLINK-33759
> URL: https://issues.apache.org/jira/browse/FLINK-33759
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Cai Liuyang
>Priority: Major
>
> When we use flink-parquet format wirte Map[] type (which will 
> be read by spark job), we encounter an exception:
> {code:java}
> // code placeholder
> org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
> field should be ommited completely instead {code}
> org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
> field should be ommited completely instead
> ``, after review the code, we found flink-parquet doesn't support write 
> nested array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] 
> doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` 
> function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33759) flink parquet writer support write nested array or map type

2023-12-05 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-33759:
---

 Summary: flink parquet writer support write nested array or map 
type
 Key: FLINK-33759
 URL: https://issues.apache.org/jira/browse/FLINK-33759
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Cai Liuyang


When we use flink-parquet format wirte Map[] type (which will 
be read by spark job), we encounter an exception: 
`org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead`, after review the code, we found 
flink-parquet doesn't support write nested array or map, because 
[ArrayWriter]([https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)]
 and 
[MapWriter]([https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)]
 doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost

2023-06-16 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733451#comment-17733451
 ] 

Cai Liuyang commented on FLINK-32362:
-

[~fanrui] I compare the two ways, in common case if one task is unready, other 
subtask may be also unready with high probability,  the inner-catch will print 
many useless log(with the same reason), so i chose the outer-catch

> SourceAlignment announceCombinedWatermark period task maybe lost
> 
>
> Key: FLINK-32362
> URL: https://issues.apache.org/jira/browse/FLINK-32362
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
>
> When we use sourcealignment,we also found there is another problem that 
> announceCombinedWatermark may throw a exception (like  "subtask 25 is not 
> ready yet to receive events" , this subtask maybe under failover), which will 
> lead the period task not running any more (ThreadPoolExecutor will not 
> schedule the period task if it throw a exception)
> I think we should increase the robustness of announceCombinedWatermark 
> function to avoid it throw any exception (if send fail, just wait next send) 
> (code see 
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
>  )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost

2023-06-16 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733349#comment-17733349
 ] 

Cai Liuyang commented on FLINK-32362:
-

Two ways:
 # the simple way code like:
{code:java}
// code placeholder
try {
    for (Integer subtaskId : subTaskIds) {
        context.sendEventToSourceOperator(
              subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
    }
 } catch (Throwable ignore) {
LOG.warn("Announce the newest combined watermark to source failed, task 
maybe during failover, wait next time to announce."
 }{code}

 # add a trySendEventToSourceOperator(), this method only send when task is 
ready and not throw exception if task is not ready.

i prefer the first one, because it's simple and it can also cover some other 
exception like rpc timeout 

> SourceAlignment announceCombinedWatermark period task maybe lost
> 
>
> Key: FLINK-32362
> URL: https://issues.apache.org/jira/browse/FLINK-32362
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>
> When we use sourcealignment,we also found there is another problem that 
> announceCombinedWatermark may throw a exception (like  "subtask 25 is not 
> ready yet to receive events" , this subtask maybe under failover), which will 
> lead the period task not running any more (ThreadPoolExecutor will not 
> schedule the period task if it throw a exception)
> I think we should increase the robustness of announceCombinedWatermark 
> function to avoid it throw any exception (if send fail, just wait next send) 
> (code see 
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
>  )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost

2023-06-16 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733349#comment-17733349
 ] 

Cai Liuyang edited comment on FLINK-32362 at 6/16/23 6:38 AM:
--

[~fanrui] Two ways:
 # the simple way code like:
{code:java}
// code placeholder
try {
    for (Integer subtaskId : subTaskIds) {
        context.sendEventToSourceOperator(
              subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
    }
 } catch (Throwable ignore) {
LOG.warn("Announce the newest combined watermark to source failed, task 
maybe during failover, wait next time to announce."
 }{code}

 # add a trySendEventToSourceOperator(), this method only send when task is 
ready and not throw exception if task is not ready.

i prefer the first one, because it's simple and it can also cover some other 
exception like rpc timeout 


was (Author: cailiuyang):
Two ways:
 # the simple way code like:
{code:java}
// code placeholder
try {
    for (Integer subtaskId : subTaskIds) {
        context.sendEventToSourceOperator(
              subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
    }
 } catch (Throwable ignore) {
LOG.warn("Announce the newest combined watermark to source failed, task 
maybe during failover, wait next time to announce."
 }{code}

 # add a trySendEventToSourceOperator(), this method only send when task is 
ready and not throw exception if task is not ready.

i prefer the first one, because it's simple and it can also cover some other 
exception like rpc timeout 

> SourceAlignment announceCombinedWatermark period task maybe lost
> 
>
> Key: FLINK-32362
> URL: https://issues.apache.org/jira/browse/FLINK-32362
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>
> When we use sourcealignment,we also found there is another problem that 
> announceCombinedWatermark may throw a exception (like  "subtask 25 is not 
> ready yet to receive events" , this subtask maybe under failover), which will 
> lead the period task not running any more (ThreadPoolExecutor will not 
> schedule the period task if it throw a exception)
> I think we should increase the robustness of announceCombinedWatermark 
> function to avoid it throw any exception (if send fail, just wait next send) 
> (code see 
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
>  )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost

2023-06-16 Thread Cai Liuyang (Jira)


[ https://issues.apache.org/jira/browse/FLINK-32362 ]


Cai Liuyang deleted comment on FLINK-32362:
-

was (Author: cailiuyang):
[~fanrui] Two ways:

> SourceAlignment announceCombinedWatermark period task maybe lost
> 
>
> Key: FLINK-32362
> URL: https://issues.apache.org/jira/browse/FLINK-32362
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>
> When we use sourcealignment,we also found there is another problem that 
> announceCombinedWatermark may throw a exception (like  "subtask 25 is not 
> ready yet to receive events" , this subtask maybe under failover), which will 
> lead the period task not running any more (ThreadPoolExecutor will not 
> schedule the period task if it throw a exception)
> I think we should increase the robustness of announceCombinedWatermark 
> function to avoid it throw any exception (if send fail, just wait next send) 
> (code see 
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
>  )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost

2023-06-16 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733345#comment-17733345
 ] 

Cai Liuyang commented on FLINK-32362:
-

[~fanrui] Two ways:

> SourceAlignment announceCombinedWatermark period task maybe lost
> 
>
> Key: FLINK-32362
> URL: https://issues.apache.org/jira/browse/FLINK-32362
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>
> When we use sourcealignment,we also found there is another problem that 
> announceCombinedWatermark may throw a exception (like  "subtask 25 is not 
> ready yet to receive events" , this subtask maybe under failover), which will 
> lead the period task not running any more (ThreadPoolExecutor will not 
> schedule the period task if it throw a exception)
> I think we should increase the robustness of announceCombinedWatermark 
> function to avoid it throw any exception (if send fail, just wait next send) 
> (code see 
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
>  )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-15 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-32316:

Description: 
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

And we also found another problem see jira: 
https://issues.apache.org/jira/browse/FLINK-32362

  was:
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

-And we also found there is another problem that announceCombinedWatermark may 
throw a exception (like  "subtask 25 is not ready yet to receive events" , this 
subtask maybe under failover) lead the period task not running any more 
(ThreadPoolExecutor will not schedule the period task if it throw a exception), 
i think we should increase the robustness of announceCombinedWatermark function 
to cover this case (see 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
 )-

(see jira: https://issues.apache.org/jira/browse/FLINK-32362)


> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is created it will create the 
> DefaultExecutionGraph, this will init the first sourceCoordinator but will 
> not start it.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling), so the first 
> SourceCoordinator will not be fully closed.
>  
> And we also found another problem see jira: 
> https://issues.apache.org/jira/browse/FLINK-32362



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-15 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-32316:

Description: 
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

-And we also found there is another problem that announceCombinedWatermark may 
throw a exception (like  "subtask 25 is not ready yet to receive events" , this 
subtask maybe under failover) lead the period task not running any more 
(ThreadPoolExecutor will not schedule the period task if it throw a exception), 
i think we should increase the robustness of announceCombinedWatermark function 
to cover this case (see 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
 )-

(see jira: https://issues.apache.org/jira/browse/FLINK-32362)

  was:
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

And we also found there is another problem that announceCombinedWatermark may 
throw a exception (like  "subtask 25 is not ready yet to receive events" , this 
subtask maybe under failover) lead the period task not running any more 
(ThreadPoolExecutor will not schedule the period task if it throw a exception), 
i think we should increase the robustness of announceCombinedWatermark function 
to cover this case (see 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
 )


> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is created it will create the 
> DefaultExecutionGraph, this will init the first sourceCoordinator but will 
> not start it.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old 

[jira] [Created] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost

2023-06-15 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-32362:
---

 Summary: SourceAlignment announceCombinedWatermark period task 
maybe lost
 Key: FLINK-32362
 URL: https://issues.apache.org/jira/browse/FLINK-32362
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.0
Reporter: Cai Liuyang


When we use sourcealignment,we also found there is another problem that 
announceCombinedWatermark may throw a exception (like  "subtask 25 is not ready 
yet to receive events" , this subtask maybe under failover), which will lead 
the period task not running any more (ThreadPoolExecutor will not schedule the 
period task if it throw a exception)

I think we should increase the robustness of announceCombinedWatermark function 
to avoid it throw any exception (if send fail, just wait next send) (code see 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
 )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-15 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733278#comment-17733278
 ] 

Cai Liuyang commented on FLINK-32316:
-

[~fanrui] OK,please assign to me,thks~

> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is created it will create the 
> DefaultExecutionGraph, this will init the first sourceCoordinator but will 
> not start it.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling), so the first 
> SourceCoordinator will not be fully closed.
>  
> And we also found there is another problem that announceCombinedWatermark may 
> throw a exception (like  "subtask 25 is not ready yet to receive events" , 
> this subtask maybe under failover) lead the period task not running any more 
> (ThreadPoolExecutor will not schedule the period task if it throw a 
> exception), i think we should increase the robustness of 
> announceCombinedWatermark function to cover this case (see 
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
>  )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-32316:

Description: 
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

And we also found there is another problem that announceCombinedWatermark may 
throw a exception (like  "subtask 25 is not ready yet to receive events" , this 
subtask maybe under failover) lead the period task not running any more 
(ThreadPoolExecutor will not schedule the period task if it throw a exception), 
i think we should increase the robustness of announceCombinedWatermark function 
to cover this case (see 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
 )

  was:
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 


> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is created it will create the 
> DefaultExecutionGraph, this will init the first sourceCoordinator but will 
> not start it.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling), so the first 
> SourceCoordinator will not be fully closed.
>  
> And we also found there is another problem that announceCombinedWatermark may 
> throw a exception (like  "subtask 25 is not ready yet to receive events" , 
> this subtask maybe under failover) lead the period task not running any more 
> (ThreadPoolExecutor will not schedule the period task if it throw a 
> exception), i think we should increase the robustness of 
> announceCombinedWatermark 

[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-32316:

Description: 
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

  was:
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 


> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is created it will create the 
> DefaultExecutionGraph, this will init the first sourceCoordinator but will 
> not start it.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling), so the first 
> SourceCoordinator will not be fully closed.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-32316:

Description: 
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

  was:
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is create it will create the 
DefaultExecutionGraph.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling, so the first SourceCoordinator 
will not be fully closed).

 


> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover, 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is created it will create the 
> DefaultExecutionGraph, this will init the first sourceCoordinator but will 
> not start it.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling), so the first 
> SourceCoordinator will not be fully closed.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731608#comment-17731608
 ] 

Cai Liuyang commented on FLINK-32316:
-

[~pnowojski]  please take a look? thks~

> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover, 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is create it will create the 
> DefaultExecutionGraph.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling, so the first 
> SourceCoordinator will not be fully closed).
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-32316:
---

 Summary: Duplicated announceCombinedWatermark task maybe scheduled 
if jobmanager failover
 Key: FLINK-32316
 URL: https://issues.apache.org/jira/browse/FLINK-32316
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.0
Reporter: Cai Liuyang


When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is create it will create the 
DefaultExecutionGraph.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling, so the first SourceCoordinator 
will not be fully closed).

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29577) Disable rocksdb wal when restore from full snapshot

2023-05-18 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang closed FLINK-29577.
---
Resolution: Fixed

> Disable rocksdb wal when restore from full snapshot
> ---
>
> Key: FLINK-29577
> URL: https://issues.apache.org/jira/browse/FLINK-29577
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-10-20-16-08-15-746.png, 
> image-2022-10-20-16-11-04-211.png
>
>
> For now, RocksDBFullRestoreOperation and 
> RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to 
> RocksDBWriteBatchWrapper when restore kv-data, which will use 
> RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal 
> explicitly, see code below), so during restoring from full snapshot, wal is 
> enabled(use more disk and maybe affect rocksdb-write-performance when 
> restoring)
>  
> {code:java}
> // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
> pass WriteOptions to RocksDBWriteBatchWrapper(null as default)
> private void restoreKVStateData(
> ThrowingIterator keyGroups,
> Map columnFamilies,
> Map> 
> restoredPQStates)
> throws IOException, RocksDBException, StateMigrationException {
> // for all key-groups in the current state handle...
> try (RocksDBWriteBatchWrapper writeBatchWrapper =
> new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
> writeBatchSize)) {
> HeapPriorityQueueSnapshotRestoreWrapper 
> restoredPQ = null;
> ColumnFamilyHandle handle = null;
>..
> }
> // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
> explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
> public void flush() throws RocksDBException {
> if (options != null) {
> db.write(options, batch);
> } else {
> // use the default WriteOptions, if wasn't provided.
> try (WriteOptions writeOptions = new WriteOptions()) {
> db.write(writeOptions, batch);
> }
> }
> batch.clear();
> }
> {code}
>  
>  
> As we known, rocksdb's wal is usesless for flink, so i think we can disable 
> wal for RocksDBWriteBatchWrapper's default WriteOptions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29577) Disable rocksdb wal when restore from full snapshot

2022-10-20 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620889#comment-17620889
 ] 

Cai Liuyang edited comment on FLINK-29577 at 10/20/22 8:11 AM:
---

[~ym] 

This is my test case:

1. disable rocksdb managed memory
2. use rocksdb full snapshot strategy
3. KeyProcessor have one subtask, five state(state1, state2, state3, state4, 
state5), total state size is 2GB
4. the size of state1 is small, the total size of state1 is less than rocksdb 
default memtable size(64M), such as state1 just have one record

after restoring, rocksdb wal can not be gc(see the png1), the reason i guess is 
state1's memtable doesn't flushed, and only by gced until state1's memtable be 
flushed (Rocksdb Full snapshot only take a snapshot).

During my test, I found if state1 ~ state4 size is big but state5 is small(only 
have one record), then there only be one wal file(see png2)

In my test, disable wal doesn't imporve restore speed, two case(disable / 
enable wal during restore) is almost the same.

 

!image-2022-10-20-16-08-15-746.png!

!image-2022-10-20-16-11-04-211.png!


was (Author: cailiuyang):
[~ym] 

This is my test case:

1. disable rocksdb managed memory
2. use rocksdb full snapshot strategy
3. KeyProcessor have one subtask, five state(state1, state2, state3, state4, 
state5), total state size is 2GB
4. the size of state1 is small, the total size of state1 is less than rocksdb 
default memtable size(64M), such as state1 just have one record

after restoring, rocksdb wal can not be gc(see the png1), the reason i guess is 
state1's memtable doesn't flushed, and only by gced until state1's memtable be 
flushed (Rocksdb Full snapshot only take a snapshot).

During my test, I found if state1 ~ state4 size is big but state5 is small(only 
have one record), then there only be one wal file(see png2)


In my test, disable wal doesn't imporve restore speed, two case(disable / 
enable wal during restore) is almost the same.

 

!image-2022-10-20-16-08-15-746.png!!image-2022-10-20-16-08-54-359.png!

> Disable rocksdb wal when restore from full snapshot
> ---
>
> Key: FLINK-29577
> URL: https://issues.apache.org/jira/browse/FLINK-29577
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-10-20-16-08-15-746.png, 
> image-2022-10-20-16-11-04-211.png
>
>
> For now, RocksDBFullRestoreOperation and 
> RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to 
> RocksDBWriteBatchWrapper when restore kv-data, which will use 
> RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal 
> explicitly, see code below), so during restoring from full snapshot, wal is 
> enabled(use more disk and maybe affect rocksdb-write-performance when 
> restoring)
>  
> {code:java}
> // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
> pass WriteOptions to RocksDBWriteBatchWrapper(null as default)
> private void restoreKVStateData(
> ThrowingIterator keyGroups,
> Map columnFamilies,
> Map> 
> restoredPQStates)
> throws IOException, RocksDBException, StateMigrationException {
> // for all key-groups in the current state handle...
> try (RocksDBWriteBatchWrapper writeBatchWrapper =
> new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
> writeBatchSize)) {
> HeapPriorityQueueSnapshotRestoreWrapper 
> restoredPQ = null;
> ColumnFamilyHandle handle = null;
>..
> }
> // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
> explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
> public void flush() throws RocksDBException {
> if (options != null) {
> db.write(options, batch);
> } else {
> // use the default WriteOptions, if wasn't provided.
> try (WriteOptions writeOptions = new WriteOptions()) {
> db.write(writeOptions, batch);
> }
> }
> batch.clear();
> }
> {code}
>  
>  
> As we known, rocksdb's wal is usesless for flink, so i think we can disable 
> wal for RocksDBWriteBatchWrapper's default WriteOptions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29577) Disable rocksdb wal when restore from full snapshot

2022-10-20 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620889#comment-17620889
 ] 

Cai Liuyang commented on FLINK-29577:
-

[~ym] 

This is my test case:

1. disable rocksdb managed memory
2. use rocksdb full snapshot strategy
3. KeyProcessor have one subtask, five state(state1, state2, state3, state4, 
state5), total state size is 2GB
4. the size of state1 is small, the total size of state1 is less than rocksdb 
default memtable size(64M), such as state1 just have one record

after restoring, rocksdb wal can not be gc(see the png1), the reason i guess is 
state1's memtable doesn't flushed, and only by gced until state1's memtable be 
flushed (Rocksdb Full snapshot only take a snapshot).

During my test, I found if state1 ~ state4 size is big but state5 is small(only 
have one record), then there only be one wal file(see png2)


In my test, disable wal doesn't imporve restore speed, two case(disable / 
enable wal during restore) is almost the same.

 

!image-2022-10-20-16-08-15-746.png!!image-2022-10-20-16-08-54-359.png!

> Disable rocksdb wal when restore from full snapshot
> ---
>
> Key: FLINK-29577
> URL: https://issues.apache.org/jira/browse/FLINK-29577
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-10-20-16-08-15-746.png
>
>
> For now, RocksDBFullRestoreOperation and 
> RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to 
> RocksDBWriteBatchWrapper when restore kv-data, which will use 
> RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal 
> explicitly, see code below), so during restoring from full snapshot, wal is 
> enabled(use more disk and maybe affect rocksdb-write-performance when 
> restoring)
>  
> {code:java}
> // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
> pass WriteOptions to RocksDBWriteBatchWrapper(null as default)
> private void restoreKVStateData(
> ThrowingIterator keyGroups,
> Map columnFamilies,
> Map> 
> restoredPQStates)
> throws IOException, RocksDBException, StateMigrationException {
> // for all key-groups in the current state handle...
> try (RocksDBWriteBatchWrapper writeBatchWrapper =
> new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
> writeBatchSize)) {
> HeapPriorityQueueSnapshotRestoreWrapper 
> restoredPQ = null;
> ColumnFamilyHandle handle = null;
>..
> }
> // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
> explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
> public void flush() throws RocksDBException {
> if (options != null) {
> db.write(options, batch);
> } else {
> // use the default WriteOptions, if wasn't provided.
> try (WriteOptions writeOptions = new WriteOptions()) {
> db.write(writeOptions, batch);
> }
> }
> batch.clear();
> }
> {code}
>  
>  
> As we known, rocksdb's wal is usesless for flink, so i think we can disable 
> wal for RocksDBWriteBatchWrapper's default WriteOptions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29577) Disable rocksdb wal when restore from full snapshot

2022-10-20 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-29577:

Attachment: image-2022-10-20-16-08-15-746.png

> Disable rocksdb wal when restore from full snapshot
> ---
>
> Key: FLINK-29577
> URL: https://issues.apache.org/jira/browse/FLINK-29577
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-10-20-16-08-15-746.png
>
>
> For now, RocksDBFullRestoreOperation and 
> RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to 
> RocksDBWriteBatchWrapper when restore kv-data, which will use 
> RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal 
> explicitly, see code below), so during restoring from full snapshot, wal is 
> enabled(use more disk and maybe affect rocksdb-write-performance when 
> restoring)
>  
> {code:java}
> // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
> pass WriteOptions to RocksDBWriteBatchWrapper(null as default)
> private void restoreKVStateData(
> ThrowingIterator keyGroups,
> Map columnFamilies,
> Map> 
> restoredPQStates)
> throws IOException, RocksDBException, StateMigrationException {
> // for all key-groups in the current state handle...
> try (RocksDBWriteBatchWrapper writeBatchWrapper =
> new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
> writeBatchSize)) {
> HeapPriorityQueueSnapshotRestoreWrapper 
> restoredPQ = null;
> ColumnFamilyHandle handle = null;
>..
> }
> // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
> explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
> public void flush() throws RocksDBException {
> if (options != null) {
> db.write(options, batch);
> } else {
> // use the default WriteOptions, if wasn't provided.
> try (WriteOptions writeOptions = new WriteOptions()) {
> db.write(writeOptions, batch);
> }
> }
> batch.clear();
> }
> {code}
>  
>  
> As we known, rocksdb's wal is usesless for flink, so i think we can disable 
> wal for RocksDBWriteBatchWrapper's default WriteOptions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29577) Disable rocksdb wal when restore from full snapshot

2022-10-12 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-29577:

Attachment: (was: image-2022-10-12-20-15-11-219.png)

> Disable rocksdb wal when restore from full snapshot
> ---
>
> Key: FLINK-29577
> URL: https://issues.apache.org/jira/browse/FLINK-29577
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
>
> For now, RocksDBFullRestoreOperation and 
> RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to 
> RocksDBWriteBatchWrapper when restore kv-data, which will use 
> RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal 
> explicitly, see code below), so during restoring from full snapshot, wal is 
> enabled(use more disk and maybe affect rocksdb-write-performance when 
> restoring)
>  
> {code:java}
> // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
> pass WriteOptions to RocksDBWriteBatchWrapper(null as default)
> private void restoreKVStateData(
> ThrowingIterator keyGroups,
> Map columnFamilies,
> Map> 
> restoredPQStates)
> throws IOException, RocksDBException, StateMigrationException {
> // for all key-groups in the current state handle...
> try (RocksDBWriteBatchWrapper writeBatchWrapper =
> new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
> writeBatchSize)) {
> HeapPriorityQueueSnapshotRestoreWrapper 
> restoredPQ = null;
> ColumnFamilyHandle handle = null;
>..
> }
> // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
> explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
> public void flush() throws RocksDBException {
> if (options != null) {
> db.write(options, batch);
> } else {
> // use the default WriteOptions, if wasn't provided.
> try (WriteOptions writeOptions = new WriteOptions()) {
> db.write(writeOptions, batch);
> }
> }
> batch.clear();
> }
> {code}
>  
>  
> As we known, rocksdb's wal is usesless for flink, so i think we can disable 
> wal for RocksDBWriteBatchWrapper's default WriteOptions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29577) Disable rocksdb wal when restore from full snapshot

2022-10-12 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-29577:

Attachment: image-2022-10-12-20-15-11-219.png

> Disable rocksdb wal when restore from full snapshot
> ---
>
> Key: FLINK-29577
> URL: https://issues.apache.org/jira/browse/FLINK-29577
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-10-12-20-15-11-219.png
>
>
> For now, RocksDBFullRestoreOperation and 
> RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to 
> RocksDBWriteBatchWrapper when restore kv-data, which will use 
> RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal 
> explicitly, see code below), so during restoring from full snapshot, wal is 
> enabled(use more disk and maybe affect rocksdb-write-performance when 
> restoring)
>  
> {code:java}
> // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
> pass WriteOptions to RocksDBWriteBatchWrapper(null as default)
> private void restoreKVStateData(
> ThrowingIterator keyGroups,
> Map columnFamilies,
> Map> 
> restoredPQStates)
> throws IOException, RocksDBException, StateMigrationException {
> // for all key-groups in the current state handle...
> try (RocksDBWriteBatchWrapper writeBatchWrapper =
> new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
> writeBatchSize)) {
> HeapPriorityQueueSnapshotRestoreWrapper 
> restoredPQ = null;
> ColumnFamilyHandle handle = null;
>..
> }
> // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
> explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
> public void flush() throws RocksDBException {
> if (options != null) {
> db.write(options, batch);
> } else {
> // use the default WriteOptions, if wasn't provided.
> try (WriteOptions writeOptions = new WriteOptions()) {
> db.write(writeOptions, batch);
> }
> }
> batch.clear();
> }
> {code}
>  
>  
> As we known, rocksdb's wal is usesless for flink, so i think we can disable 
> wal for RocksDBWriteBatchWrapper's default WriteOptions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29577) Disable rocksdb wal when restore from full snapshot

2022-10-11 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-29577:

Description: 
For now, RocksDBFullRestoreOperation and RocksDBHeapTimersFullRestoreOperation 
does's pass RocksDB::WriteOptions to RocksDBWriteBatchWrapper when restore 
kv-data, which will use RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't 
disable rocksdb wal explicitly, see code below), so during restoring from full 
snapshot, wal is enabled(use more disk and maybe affect 
rocksdb-write-performance when restoring)

 
{code:java}
// First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
pass WriteOptions to RocksDBWriteBatchWrapper(null as default)

private void restoreKVStateData(
ThrowingIterator keyGroups,
Map columnFamilies,
Map> 
restoredPQStates)
throws IOException, RocksDBException, StateMigrationException {
// for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
writeBatchSize)) {
HeapPriorityQueueSnapshotRestoreWrapper 
restoredPQ = null;
ColumnFamilyHandle handle = null;
   ..
}


// Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
public void flush() throws RocksDBException {
if (options != null) {
db.write(options, batch);
} else {
// use the default WriteOptions, if wasn't provided.
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, batch);
}
}
batch.clear();
}

{code}
 

 

As we known, rocksdb's wal is usesless for flink, so i think we can disable wal 
for RocksDBWriteBatchWrapper's default WriteOptions.

  was:
For now, RocksDBFullRestoreOperation and RocksDBHeapTimersFullRestoreOperation 
does's pass RocksDB::WriteOptions to RocksDBWriteBatchWrapper when restore 
kv-data, which will use RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't 
disable rocksdb wal explicitly, see code below), so during restoring from full 
snapshot, wal is enabled(use more disk and affect rocksdb-write-performance 
when restoring)

 
{code:java}
// First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
pass WriteOptions to RocksDBWriteBatchWrapper(null as default)

private void restoreKVStateData(
ThrowingIterator keyGroups,
Map columnFamilies,
Map> 
restoredPQStates)
throws IOException, RocksDBException, StateMigrationException {
// for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
writeBatchSize)) {
HeapPriorityQueueSnapshotRestoreWrapper 
restoredPQ = null;
ColumnFamilyHandle handle = null;
   ..
}


// Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
public void flush() throws RocksDBException {
if (options != null) {
db.write(options, batch);
} else {
// use the default WriteOptions, if wasn't provided.
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, batch);
}
}
batch.clear();
}

{code}
 

 

As we known, rocksdb's wal is usesless for flink, so i think we can disable wal 
for RocksDBWriteBatchWrapper's default WriteOptions.


> Disable rocksdb wal when restore from full snapshot
> ---
>
> Key: FLINK-29577
> URL: https://issues.apache.org/jira/browse/FLINK-29577
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
>
> For now, RocksDBFullRestoreOperation and 
> RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to 
> RocksDBWriteBatchWrapper when restore kv-data, which will use 
> RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal 
> explicitly, see code below), so during restoring from full snapshot, wal is 
> enabled(use more disk and maybe affect rocksdb-write-performance when 
> restoring)
>  
> {code:java}
> // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
> pass WriteOptions to RocksDBWriteBatchWrapper(null as default)
> private void restoreKVStateData(
> ThrowingIterator keyGroups,
> Map columnFamilies,
> Map> 
> restoredPQStates)
> throws IOException, RocksDBException, StateMigrationException {
> // for all key-groups in the current state handle...
>  

[jira] [Commented] (FLINK-29577) Disable rocksdb wal when restore from full snapshot

2022-10-11 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17615862#comment-17615862
 ] 

Cai Liuyang commented on FLINK-29577:
-

[~yunta] Sorry, i only observe disk usage doubled during restoring, write 
performance doesn't test, but i think disable wal maybe improve performance 
more or less(if i'm wrong, please correct me)

> Disable rocksdb wal when restore from full snapshot
> ---
>
> Key: FLINK-29577
> URL: https://issues.apache.org/jira/browse/FLINK-29577
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
>
> For now, RocksDBFullRestoreOperation and 
> RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to 
> RocksDBWriteBatchWrapper when restore kv-data, which will use 
> RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal 
> explicitly, see code below), so during restoring from full snapshot, wal is 
> enabled(use more disk and affect rocksdb-write-performance when restoring)
>  
> {code:java}
> // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
> pass WriteOptions to RocksDBWriteBatchWrapper(null as default)
> private void restoreKVStateData(
> ThrowingIterator keyGroups,
> Map columnFamilies,
> Map> 
> restoredPQStates)
> throws IOException, RocksDBException, StateMigrationException {
> // for all key-groups in the current state handle...
> try (RocksDBWriteBatchWrapper writeBatchWrapper =
> new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
> writeBatchSize)) {
> HeapPriorityQueueSnapshotRestoreWrapper 
> restoredPQ = null;
> ColumnFamilyHandle handle = null;
>..
> }
> // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
> explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
> public void flush() throws RocksDBException {
> if (options != null) {
> db.write(options, batch);
> } else {
> // use the default WriteOptions, if wasn't provided.
> try (WriteOptions writeOptions = new WriteOptions()) {
> db.write(writeOptions, batch);
> }
> }
> batch.clear();
> }
> {code}
>  
>  
> As we known, rocksdb's wal is usesless for flink, so i think we can disable 
> wal for RocksDBWriteBatchWrapper's default WriteOptions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29577) Disable rocksdb wal when restore from full snapshot

2022-10-11 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-29577:
---

 Summary: Disable rocksdb wal when restore from full snapshot
 Key: FLINK-29577
 URL: https://issues.apache.org/jira/browse/FLINK-29577
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Cai Liuyang


For now, RocksDBFullRestoreOperation and RocksDBHeapTimersFullRestoreOperation 
does's pass RocksDB::WriteOptions to RocksDBWriteBatchWrapper when restore 
kv-data, which will use RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't 
disable rocksdb wal explicitly, see code below), so during restoring from full 
snapshot, wal is enabled(use more disk and affect rocksdb-write-performance 
when restoring)

 
{code:java}
// First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't 
pass WriteOptions to RocksDBWriteBatchWrapper(null as default)

private void restoreKVStateData(
ThrowingIterator keyGroups,
Map columnFamilies,
Map> 
restoredPQStates)
throws IOException, RocksDBException, StateMigrationException {
// for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
writeBatchSize)) {
HeapPriorityQueueSnapshotRestoreWrapper 
restoredPQ = null;
ColumnFamilyHandle handle = null;
   ..
}


// Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal 
explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper
public void flush() throws RocksDBException {
if (options != null) {
db.write(options, batch);
} else {
// use the default WriteOptions, if wasn't provided.
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, batch);
}
}
batch.clear();
}

{code}
 

 

As we known, rocksdb's wal is usesless for flink, so i think we can disable wal 
for RocksDBWriteBatchWrapper's default WriteOptions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-17 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17494359#comment-17494359
 ] 

Cai Liuyang commented on FLINK-26080:
-

Yeah, looks like the same problem, thks [~pnowojski] , i'll read this issue 
carefully~

 

> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its all donwStreamTask is idle, job 
> will keep this status until chk is timeout(use aligned chk); After we analyse 
> this case, we found Half-opend-socket (see 
> [https://en.wikipedia.org/wiki/TCP_half-open] ) which is  already closed on 
> server side but established on client side,lead to this: 
>     1. NettyServer encounter ReadTimeoutException when read data from 
> channel, then it will release the NetworkSequenceViewReader (which is 
> responsable to send data to PartitionRequestClient) and write ErrorResponse 
> to PartitionRequestClient. After writing ErrorResponse success, server will 
> close the channel (socket will be transformed to fin_wait1 status)
>     2. PartitionRequestClient doesn't receive the ErrorResponse and server's 
> FIN, so client will keep socket be establised status and waiting for 
> BufferResponse from server (maybe our machine's kernel-bug lead to 
> ErrorResponse and FIN lost )
>     3. Server machine will release the socket if it keep fin_wait1 status for 
> two long time, but the socket on client machine is also under established 
> status, and so lead to Half-opened-socket
> To avoid this case,I think there are two methods:
>     1. Client enable TCP keep alive(flink is already enabled): this way 
> should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's 
> default time is 7200 seconds, which is two long).
>     2. Client use netty‘s IdleStateHandler to detect whether channel is 
> idle(read or write), if channel is idle, client will try to write pingMsg to 
> server to detect whether channel is really ok.
> For the two methods, i recommend the method-2, because adjustment of 
> machine's tcp-keep-alive time will have an impact on other service running on 
> the same machine
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true

2022-02-11 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang closed FLINK-25664.
---
Resolution: Abandoned

> Notify will be not triggered for PipelinedSubpartition if more than one 
> buffer is added during isBlocked == true
> 
>
> Key: FLINK-25664
> URL: https://issues.apache.org/jira/browse/FLINK-25664
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Critical
>  Labels: pull-request-available
>
> For now, there might be case like:
>  # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked 
> == false)
>  # CreditBasedSequenceNumberingViewReader pool this buffer and 
> PipelinedSubPartition become to Blocked (isBlocked == true)
>  # Before downStream resumeConsumption, we add two finished-buffer to this 
> PipelinedSubPartition (there is no limit for adding buffer to 
> blocked-PipelinedSubPartition)
>  ## add the first finished-buffer will not notifyDataAvailable because 
> isBlocked == true
>  ## add the second finished-buffer will also not notifyDataAvailable because 
> of isBlocked == true and finishedBuffer > 1
>  # DownStream resumeConsumption, PipelinedSubPartition is unblocked 
> (isBlocked == false)
>  # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable 
> because of finishedBuffer > 1
> In conclusion,There are three case we should trigger notifyDataAvailable:
>     case1: only have one finished buffer (handled by add)
>     case2: only have one unfinished buffer (handled by flush)
>     case3: have more than on finished buffer, which is add during 
> PipelinedSubPartition is blocked (not handled)
> {code:java}
> // test code for this case
> // add this test case to 
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest
>  
> @Test
> public void 
> testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
> throws Exception {
> blockSubpartitionByCheckpoint(1);
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> assertEquals(1, availablityListener.getNumNotifications());
> readView.resumeConsumption();
> subpartition.flush();
> assertEquals(2, availablityListener.getNumNotifications());
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true

2022-02-11 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490882#comment-17490882
 ] 

Cai Liuyang commented on FLINK-25664:
-

yeah, got it. I forgot this logic before, thanks ~

> Notify will be not triggered for PipelinedSubpartition if more than one 
> buffer is added during isBlocked == true
> 
>
> Key: FLINK-25664
> URL: https://issues.apache.org/jira/browse/FLINK-25664
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Critical
>  Labels: pull-request-available
>
> For now, there might be case like:
>  # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked 
> == false)
>  # CreditBasedSequenceNumberingViewReader pool this buffer and 
> PipelinedSubPartition become to Blocked (isBlocked == true)
>  # Before downStream resumeConsumption, we add two finished-buffer to this 
> PipelinedSubPartition (there is no limit for adding buffer to 
> blocked-PipelinedSubPartition)
>  ## add the first finished-buffer will not notifyDataAvailable because 
> isBlocked == true
>  ## add the second finished-buffer will also not notifyDataAvailable because 
> of isBlocked == true and finishedBuffer > 1
>  # DownStream resumeConsumption, PipelinedSubPartition is unblocked 
> (isBlocked == false)
>  # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable 
> because of finishedBuffer > 1
> In conclusion,There are three case we should trigger notifyDataAvailable:
>     case1: only have one finished buffer (handled by add)
>     case2: only have one unfinished buffer (handled by flush)
>     case3: have more than on finished buffer, which is add during 
> PipelinedSubPartition is blocked (not handled)
> {code:java}
> // test code for this case
> // add this test case to 
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest
>  
> @Test
> public void 
> testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
> throws Exception {
> blockSubpartitionByCheckpoint(1);
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> assertEquals(1, availablityListener.getNumNotifications());
> readView.resumeConsumption();
> subpartition.flush();
> assertEquals(2, availablityListener.getNumNotifications());
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true

2022-02-11 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490882#comment-17490882
 ] 

Cai Liuyang edited comment on FLINK-25664 at 2/11/22, 12:20 PM:


yeah, got it. I forgot this logic before, thanks ~ [~pnowojski] 


was (Author: cailiuyang):
yeah, got it. I forgot this logic before, thanks ~

> Notify will be not triggered for PipelinedSubpartition if more than one 
> buffer is added during isBlocked == true
> 
>
> Key: FLINK-25664
> URL: https://issues.apache.org/jira/browse/FLINK-25664
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Critical
>  Labels: pull-request-available
>
> For now, there might be case like:
>  # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked 
> == false)
>  # CreditBasedSequenceNumberingViewReader pool this buffer and 
> PipelinedSubPartition become to Blocked (isBlocked == true)
>  # Before downStream resumeConsumption, we add two finished-buffer to this 
> PipelinedSubPartition (there is no limit for adding buffer to 
> blocked-PipelinedSubPartition)
>  ## add the first finished-buffer will not notifyDataAvailable because 
> isBlocked == true
>  ## add the second finished-buffer will also not notifyDataAvailable because 
> of isBlocked == true and finishedBuffer > 1
>  # DownStream resumeConsumption, PipelinedSubPartition is unblocked 
> (isBlocked == false)
>  # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable 
> because of finishedBuffer > 1
> In conclusion,There are three case we should trigger notifyDataAvailable:
>     case1: only have one finished buffer (handled by add)
>     case2: only have one unfinished buffer (handled by flush)
>     case3: have more than on finished buffer, which is add during 
> PipelinedSubPartition is blocked (not handled)
> {code:java}
> // test code for this case
> // add this test case to 
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest
>  
> @Test
> public void 
> testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
> throws Exception {
> blockSubpartitionByCheckpoint(1);
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> assertEquals(1, availablityListener.getNumNotifications());
> readView.resumeConsumption();
> subpartition.flush();
> assertEquals(2, availablityListener.getNumNotifications());
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Description: 
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found Half-opend-socket (see 
[https://en.wikipedia.org/wiki/TCP_half-open] ) which is  already closed on 
server side but established on client side,lead to this: 

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient. After writing ErrorResponse success, server will close 
the channel (socket will be transformed to fin_wait1 status)

    2. PartitionRequestClient doesn't receive the ErrorResponse and server's 
FIN, so client will keep socket be establised status and waiting for 
BufferResponse from server (maybe our machine's kernel-bug lead to 
ErrorResponse and FIN lost )

    3. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status, and so lead to Half-opened-socket

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 

  was:
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found Half-opend-socket (see 
[https://en.wikipedia.org/wiki/TCP_half-open] ) which is  already closed on 
server side but established on client side,lead to this: 

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient. After write ErrorResponse success(maybe only write to 
tcp socket buffer success), server will close the channel (socket will be 
transformed to fin_wait1 status)

    2. PartitionRequestClient doesn't receive the ErrorResponse and server's 
FIN, so client will keep socket be establised status and waiting for 
BufferResponse from server (maybe our machine's kernel-bug lead to 
ErrorResponse and FIN lost )

    3. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status, and so lead to Half-opened-socket

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 


> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its all donwStreamTask is idle, job 
> will keep this status until chk is timeout(use aligned chk); After we analyse 
> this case, we found Half-opend-socket (see 
> [https://en.wikipedia.org/wiki/TCP_half-open] ) which is  already closed on 
> server side but established on client side,lead to this: 
>     1. NettyServer encounter ReadTimeoutException when read data from 
> channel, then it will release the NetworkSequenceViewReader (which is 
> responsable to send data to 

[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Description: 
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found Half-opend-socket (see 
[https://en.wikipedia.org/wiki/TCP_half-open] ) which is  already closed on 
server side but established on client side,lead to this: 

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient. After write ErrorResponse success(maybe only write to 
tcp socket buffer success), server will close the channel (socket will be 
transformed to fin_wait1 status)

    2. PartitionRequestClient doesn't receive the ErrorResponse and server's 
FIN, so client will keep socket be establised status and waiting for 
BufferResponse from server (maybe our machine's kernel-bug lead to 
ErrorResponse and FIN lost )

    3. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status, and so lead to Half-opened-socket

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 

  was:
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found [ Half-opend-socket| 
[https://en.wikipedia.org/wiki/TCP_half-open 
|https://en.wikipedia.org/wiki/TCP_half-open]] which is  already closed on 
server side, but established on client side

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network 
congestion or our machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 


> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its all donwStreamTask is idle, job 
> will keep this status until chk is timeout(use aligned chk); After we analyse 
> this case, we found Half-opend-socket (see 
> [https://en.wikipedia.org/wiki/TCP_half-open] ) which is  already closed on 
> server side but established on client side,lead to this: 
>     1. NettyServer encounter ReadTimeoutException when read data from 
> channel, then it will 

[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Description: 
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found [ Half-opend-socket| 
[https://en.wikipedia.org/wiki/TCP_half-open 
|https://en.wikipedia.org/wiki/TCP_half-open]] which is  already closed on 
server side, but established on client side

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network 
congestion or our machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 

  was:
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found [half-opend 
socket|[https://en.wikipedia.org/wiki/TCP_half-open]] which is  already closed 
on server side, but established on client side

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network 
congestion or our machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 


> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its all donwStreamTask is idle, job 
> will keep this status until chk is timeout(use aligned chk); After we analyse 
> this case, we found [ Half-opend-socket| 
> [https://en.wikipedia.org/wiki/TCP_half-open 
> |https://en.wikipedia.org/wiki/TCP_half-open]] which is  already closed on 
> server side, but established on client side
>     1. NettyServer encounter ReadTimeoutException when 

[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Description: 
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found [half-opend 
socket|[https://en.wikipedia.org/wiki/TCP_half-open]] which is  already closed 
on server side, but established on client side

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network 
congestion or our machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 

  was:
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found[ half-opend 
socket|[https://en.wikipedia.org/wiki/TCP_half-open],] which is  already closed 
on server side, but established on client side

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network 
congestion or our machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 


> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its all donwStreamTask is idle, job 
> will keep this status until chk is timeout(use aligned chk); After we analyse 
> this case, we found [half-opend 
> socket|[https://en.wikipedia.org/wiki/TCP_half-open]] which is  already 
> closed on server side, but established on client side
>     1. NettyServer encounter ReadTimeoutException when read data from 
> channel, then it will release the NetworkSequenceViewReader (which is 
> 

[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Description: 
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found[ half-opend 
socket|[https://en.wikipedia.org/wiki/TCP_half-open],] which is  already closed 
on server side, but established on client side

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network 
congestion or our machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 

  was:
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found the reason:  (Machine's kernel we used may have bug that will 
lost socket event )   

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network 
congestion or our machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 


> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its all donwStreamTask is idle, job 
> will keep this status until chk is timeout(use aligned chk); After we analyse 
> this case, we found[ half-opend 
> socket|[https://en.wikipedia.org/wiki/TCP_half-open],] which is  already 
> closed on server side, but established on client side
>     1. NettyServer encounter ReadTimeoutException when read data from 
> channel, then it will release the NetworkSequenceViewReader (which is 
> responsable to send data to PartitionRequestClient) and write 

[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Description: 
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found the reason:  (Machine's kernel we used may have bug that will 
lost socket event )   

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network 
congestion or our machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 

  was:
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found the reason:  (Machine's kernel we used may have bug that will 
lost socket event )   

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 


> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its all donwStreamTask is idle, job 
> will keep this status until chk is timeout(use aligned chk); After we analyse 
> this case, we found the reason:  (Machine's kernel we used may have bug that 
> will lost socket event )   
>     1. NettyServer encounter ReadTimeoutException when read data from 
> channel, then it will release the NetworkSequenceViewReader (which is 
> responsable to send data to PartitionRequestClient) and write ErrorResponse 
> to PartitionRequestClient;
>     2. PartitionRequestClient doesn't receive the ErrorResponse (maybe 
> network 

[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Description: 
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its all donwStreamTask is idle, job will 
keep this status until chk is timeout(use aligned chk); After we analyse this 
case, we found the reason:  (Machine's kernel we used may have bug that will 
lost socket event )   

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 

  was:
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its donwStreamTask is idle, job will keep 
this status until chk is timeout(use aligned chk); After we analyse this case, 
we found the reason:  (Machine's kernel we used may have bug that will lost 
socket event )   

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 


> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its all donwStreamTask is idle, job 
> will keep this status until chk is timeout(use aligned chk); After we analyse 
> this case, we found the reason:  (Machine's kernel we used may have bug that 
> will lost socket event )   
>     1. NettyServer encounter ReadTimeoutException when read data from 
> channel, then it will release the NetworkSequenceViewReader (which is 
> responsable to send data to PartitionRequestClient) and write ErrorResponse 
> to PartitionRequestClient;
>     2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
> machine's kernel-bug lead to this)

[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Description: 
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its donwStreamTask is idle, job will keep 
this status until chk is timeout(use aligned chk); After we analyse this case, 
we found the reason:  (Machine's kernel we used may have bug that will lost 
socket event )   

    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;

    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
machine's kernel-bug lead to this)

    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:

    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).

    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.

For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 

  was:
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its donwStreamTask is idle, job will keep 
this status until chk is timeout(use aligned chk); After we analyse this case, 
we found the reason:  (Machine's kernel we used may have bug that will lost 
socket event )    1. NettyServer encounter ReadTimeoutException when read data 
from channel, then it will release the NetworkSequenceViewReader (which is 
responsable to send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;
    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
machine's kernel-bug lead to this)
    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:
    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).
    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.
For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 


> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its donwStreamTask is idle, job will 
> keep this status until chk is timeout(use aligned chk); After we analyse this 
> case, we found the reason:  (Machine's kernel we used may have bug that will 
> lost socket event )   
>     1. NettyServer encounter ReadTimeoutException when read data from 
> channel, then it will release the NetworkSequenceViewReader (which is 
> responsable to send data to PartitionRequestClient) and write ErrorResponse 
> to PartitionRequestClient;
>     2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
> machine's kernel-bug lead to this)
>     3. 

[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Description: 
In out production environment, we encounter one abnormal case:

    upstreamTask is backpressured but its donwStreamTask is idle, job will keep 
this status until chk is timeout(use aligned chk); After we analyse this case, 
we found the reason:  (Machine's kernel we used may have bug that will lost 
socket event )    1. NettyServer encounter ReadTimeoutException when read data 
from channel, then it will release the NetworkSequenceViewReader (which is 
responsable to send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;
    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
machine's kernel-bug lead to this)
    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:
    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).
    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.
For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 

  was:
In out production environment, we encounter one abnormal case:

 

upstreamTask is backpressured but its donwStreamTask is idle, job will keep 
this status until chk is timeout(use aligned chk); After we analyse this case, 
we found the reason:  (Machine's kernel we used may have bug that will lost 
socket event )
    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;
    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
machine's kernel-bug lead to this)
    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:
    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).
    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.
For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 


> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its donwStreamTask is idle, job will 
> keep this status until chk is timeout(use aligned chk); After we analyse this 
> case, we found the reason:  (Machine's kernel we used may have bug that will 
> lost socket event )    1. NettyServer encounter ReadTimeoutException when 
> read data from channel, then it will release the NetworkSequenceViewReader 
> (which is responsable to send data to PartitionRequestClient) and write 
> ErrorResponse to PartitionRequestClient;
>     2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
> machine's kernel-bug lead to this)
>     3. NettyServer after write 

[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Description: 
In out production environment, we encounter one abnormal case:

 

upstreamTask is backpressured but its donwStreamTask is idle, job will keep 
this status until chk is timeout(use aligned chk); After we analyse this case, 
we found the reason:  (Machine's kernel we used may have bug that will lost 
socket event )
    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;
    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
machine's kernel-bug lead to this)
    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:
    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).
    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.
For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 

  was:
In out production environment, we encounter one abnormal case:
    upstreamTask is backpressured but its donwStreamTask is idle, job will keep 
this status until chk is timeout(use aligned chk); After we analyse this case, 
we found the reason:  (Machine's kernel we used may have bug that will lost 
socket event )
    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;
    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
machine's kernel-bug lead to this)
    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:
    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).
    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.
For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 


> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>  
> upstreamTask is backpressured but its donwStreamTask is idle, job will keep 
> this status until chk is timeout(use aligned chk); After we analyse this 
> case, we found the reason:  (Machine's kernel we used may have bug that will 
> lost socket event )
>     1. NettyServer encounter ReadTimeoutException when read data from 
> channel, then it will release the NetworkSequenceViewReader (which is 
> responsable to send data to PartitionRequestClient) and write ErrorResponse 
> to PartitionRequestClient;
>     2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
> machine's kernel-bug lead to this)
>     3. NettyServer after 

[jira] [Created] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-26080:
---

 Summary: PartitionRequest client use Netty's IdleStateHandler to 
monitor channel's status
 Key: FLINK-26080
 URL: https://issues.apache.org/jira/browse/FLINK-26080
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Cai Liuyang


In out production environment, we encounter one abnormal case:
    upstreamTask is backpressured but its donwStreamTask is idle, job will keep 
this status until chk is timeout(use aligned chk); After we analyse this case, 
we found the reason:  (Machine's kernel we used may have bug that will lost 
socket event )
    1. NettyServer encounter ReadTimeoutException when read data from channel, 
then it will release the NetworkSequenceViewReader (which is responsable to 
send data to PartitionRequestClient) and write ErrorResponse to 
PartitionRequestClient;
    2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
machine's kernel-bug lead to this)
    3. NettyServer after write ErrorResponse, it will close the channel (socket 
will be transformed to fin_wait1 status), but client machine doesn't receive 
the Server's fin, so it will treat the channel is ok, and will keep waiting for 
server's BufferReponse (But server is already release correlative 
NetworkSequenceViewReader)

    4. Server machine will release the socket if it keep fin_wait1 status for 
two long time, but the socket on client machine is also under established 
status.

To avoid this case,I think there are two methods:
    1. Client enable TCP keep alive(flink is already enabled): this way should 
also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time 
is 7200 seconds, which is two long).
    2. Client use netty‘s IdleStateHandler to detect whether channel is 
idle(read or write), if channel is idle, client will try to write pingMsg to 
server to detect whether channel is really ok.
For the two methods, i recommend the method-2, because adjustment of machine's 
tcp-keep-alive time will have an impact on other service running on the same 
machine

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-26080:

Affects Version/s: 1.14.3

> PartitionRequest client use Netty's IdleStateHandler to monitor channel's 
> status
> 
>
> Key: FLINK-26080
> URL: https://issues.apache.org/jira/browse/FLINK-26080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>
> In out production environment, we encounter one abnormal case:
>     upstreamTask is backpressured but its donwStreamTask is idle, job will 
> keep this status until chk is timeout(use aligned chk); After we analyse this 
> case, we found the reason:  (Machine's kernel we used may have bug that will 
> lost socket event )
>     1. NettyServer encounter ReadTimeoutException when read data from 
> channel, then it will release the NetworkSequenceViewReader (which is 
> responsable to send data to PartitionRequestClient) and write ErrorResponse 
> to PartitionRequestClient;
>     2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our 
> machine's kernel-bug lead to this)
>     3. NettyServer after write ErrorResponse, it will close the channel 
> (socket will be transformed to fin_wait1 status), but client machine doesn't 
> receive the Server's fin, so it will treat the channel is ok, and will keep 
> waiting for server's BufferReponse (But server is already release correlative 
> NetworkSequenceViewReader)
>     4. Server machine will release the socket if it keep fin_wait1 status for 
> two long time, but the socket on client machine is also under established 
> status.
> To avoid this case,I think there are two methods:
>     1. Client enable TCP keep alive(flink is already enabled): this way 
> should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's 
> default time is 7200 seconds, which is two long).
>     2. Client use netty‘s IdleStateHandler to detect whether channel is 
> idle(read or write), if channel is idle, client will try to write pingMsg to 
> server to detect whether channel is really ok.
> For the two methods, i recommend the method-2, because adjustment of 
> machine's tcp-keep-alive time will have an impact on other service running on 
> the same machine
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true

2022-02-10 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-25664:

Component/s: Runtime / Network

> Notify will be not triggered for PipelinedSubpartition if more than one 
> buffer is added during isBlocked == true
> 
>
> Key: FLINK-25664
> URL: https://issues.apache.org/jira/browse/FLINK-25664
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
>
> For now, there might be case like:
>  # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked 
> == false)
>  # CreditBasedSequenceNumberingViewReader pool this buffer and 
> PipelinedSubPartition become to Blocked (isBlocked == true)
>  # Before downStream resumeConsumption, we add two finished-buffer to this 
> PipelinedSubPartition (there is no limit for adding buffer to 
> blocked-PipelinedSubPartition)
>  ## add the first finished-buffer will not notifyDataAvailable because 
> isBlocked == true
>  ## add the second finished-buffer will also not notifyDataAvailable because 
> of isBlocked == true and finishedBuffer > 1
>  # DownStream resumeConsumption, PipelinedSubPartition is unblocked 
> (isBlocked == false)
>  # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable 
> because of finishedBuffer > 1
> In conclusion,There are three case we should trigger notifyDataAvailable:
>     case1: only have one finished buffer (handled by add)
>     case2: only have one unfinished buffer (handled by flush)
>     case3: have more than on finished buffer, which is add during 
> PipelinedSubPartition is blocked (not handled)
> {code:java}
> // test code for this case
> // add this test case to 
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest
>  
> @Test
> public void 
> testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
> throws Exception {
> blockSubpartitionByCheckpoint(1);
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> assertEquals(1, availablityListener.getNumNotifications());
> readView.resumeConsumption();
> subpartition.flush();
> assertEquals(2, availablityListener.getNumNotifications());
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true

2022-02-07 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-25664:

Affects Version/s: 1.14.3

> Notify will be not triggered for PipelinedSubpartition if more than one 
> buffer is added during isBlocked == true
> 
>
> Key: FLINK-25664
> URL: https://issues.apache.org/jira/browse/FLINK-25664
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.14.3
>Reporter: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
>
> For now, there might be case like:
>  # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked 
> == false)
>  # CreditBasedSequenceNumberingViewReader pool this buffer and 
> PipelinedSubPartition become to Blocked (isBlocked == true)
>  # Before downStream resumeConsumption, we add two finished-buffer to this 
> PipelinedSubPartition (there is no limit for adding buffer to 
> blocked-PipelinedSubPartition)
>  ## add the first finished-buffer will not notifyDataAvailable because 
> isBlocked == true
>  ## add the second finished-buffer will also not notifyDataAvailable because 
> of isBlocked == true and finishedBuffer > 1
>  # DownStream resumeConsumption, PipelinedSubPartition is unblocked 
> (isBlocked == false)
>  # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable 
> because of finishedBuffer > 1
> In conclusion,There are three case we should trigger notifyDataAvailable:
>     case1: only have one finished buffer (handled by add)
>     case2: only have one unfinished buffer (handled by flush)
>     case3: have more than on finished buffer, which is add during 
> PipelinedSubPartition is blocked (not handled)
> {code:java}
> // test code for this case
> // add this test case to 
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest
>  
> @Test
> public void 
> testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
> throws Exception {
> blockSubpartitionByCheckpoint(1);
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
> assertEquals(1, availablityListener.getNumNotifications());
> readView.resumeConsumption();
> subpartition.flush();
> assertEquals(2, availablityListener.getNumNotifications());
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true

2022-01-16 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-25664:

Description: 
For now, there might be case like:
 # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == 
false)
 # CreditBasedSequenceNumberingViewReader pool this buffer and 
PipelinedSubPartition become to Blocked (isBlocked == true)
 # Before downStream resumeConsumption, we add two finished-buffer to this 
PipelinedSubPartition (there is no limit for adding buffer to 
blocked-PipelinedSubPartition)
 ## add the first finished-buffer will not notifyDataAvailable because 
isBlocked == true
 ## add the second finished-buffer will also not notifyDataAvailable because of 
isBlocked == true and finishedBuffer > 1
 # DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked 
== false)
 # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable 
because of finishedBuffer > 1

In conclusion,There are three case we should trigger notifyDataAvailable:
    case1: only have one finished buffer (handled by add)
    case2: only have one unfinished buffer (handled by flush)
    case3: have more than on finished buffer, which is add during 
PipelinedSubPartition is blocked (not handled)
{code:java}
// test code for this case
// add this test case to 
org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest
 
@Test
public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
throws Exception {
blockSubpartitionByCheckpoint(1);

subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));

assertEquals(1, availablityListener.getNumNotifications());
readView.resumeConsumption();
subpartition.flush();
assertEquals(2, availablityListener.getNumNotifications());
} {code}

  was:
For now, there might be case like:
 # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == 
false)
 # CreditBasedSequenceNumberingViewReader pool this buffer and 
PipelinedSubPartition become to Blocked (isBlocked == true)
 # Before downStream resumeConsumption, we add two finished-buffer to this 
PipelinedSubPartition (there is no limit for adding buffer to 
blocked-PipelinedSubPartition)
 ## add the first finished-buffer will not notifyDataAvailable because 
isBlocked == true
 ## add the second finished-buffer will also not notifyDataAvailable because of 
isBlocked == true and finishedBuffer > 1
 # DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked 
== false)
 # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable 
because of finishedBuffer > 1

In conclusion,There are three case we should trigger notifyDataAvailable:
    case1: only have one finished buffer (handled by add)
    case2: only have one unfinished buffer (handled by flush)
    case3: have more than on finished buffer, which is add during 
PipelinedSubPartition is blocked (not handled)
{code:java}
// test code for this case
// add this test case to 
org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest
 will 
@Test
public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
throws Exception {
blockSubpartitionByCheckpoint(1);

subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));

assertEquals(1, availablityListener.getNumNotifications());
readView.resumeConsumption();
subpartition.flush();
assertEquals(2, availablityListener.getNumNotifications());
} {code}


> Notify will be not triggered for PipelinedSubpartition if more than one 
> buffer is added during isBlocked == true
> 
>
> Key: FLINK-25664
> URL: https://issues.apache.org/jira/browse/FLINK-25664
> Project: Flink
>  Issue Type: Bug
>Reporter: Cai Liuyang
>Priority: Major
>
> For now, there might be case like:
>  # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked 
> == false)
>  # CreditBasedSequenceNumberingViewReader pool this buffer and 
> PipelinedSubPartition become to Blocked (isBlocked == true)
>  # Before downStream resumeConsumption, we add two finished-buffer to this 
> PipelinedSubPartition (there is no limit for adding buffer to 
> blocked-PipelinedSubPartition)
>  ## add the first finished-buffer will not notifyDataAvailable because 
> isBlocked == true
>  ## add the second finished-buffer will also not notifyDataAvailable because 
> of isBlocked == true and finishedBuffer > 1
>  # DownStream resumeConsumption, PipelinedSubPartition is unblocked 
> (isBlocked == false)
>  # 

[jira] [Created] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true

2022-01-16 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-25664:
---

 Summary: Notify will be not triggered for PipelinedSubpartition if 
more than one buffer is added during isBlocked == true
 Key: FLINK-25664
 URL: https://issues.apache.org/jira/browse/FLINK-25664
 Project: Flink
  Issue Type: Bug
Reporter: Cai Liuyang


For now, there might be case like:
 # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == 
false)
 # CreditBasedSequenceNumberingViewReader pool this buffer and 
PipelinedSubPartition become to Blocked (isBlocked == true)
 # Before downStream resumeConsumption, we add two finished-buffer to this 
PipelinedSubPartition (there is no limit for adding buffer to 
blocked-PipelinedSubPartition)
 ## add the first finished-buffer will not notifyDataAvailable because 
isBlocked == true
 ## add the second finished-buffer will also not notifyDataAvailable because of 
isBlocked == true and finishedBuffer > 1
 # DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked 
== false)
 # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable 
because of finishedBuffer > 1

In conclusion,There are three case we should trigger notifyDataAvailable:
    case1: only have one finished buffer (handled by add)
    case2: only have one unfinished buffer (handled by flush)
    case3: have more than on finished buffer, which is add during 
PipelinedSubPartition is blocked (not handled)
{code:java}
// test code for this case
// add this test case to 
org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest
 will 
@Test
public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
throws Exception {
blockSubpartitionByCheckpoint(1);

subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));

assertEquals(1, availablityListener.getNumNotifications());
readView.resumeConsumption();
subpartition.flush();
assertEquals(2, availablityListener.getNumNotifications());
} {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)