[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-33934: Description: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue (our internal mq) and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse rowData object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that RawFormatDeserializationSchema::deserialize returned, this will lead to data lost; The reason that we select value and metadata field at the same time will not encounter data lost is: if we select metadata field there will return a new RowData object see code: [DynamicKafkaDeserializationSchema deserialize with metadata field |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] and if we only select value filed, it will reuse the RowData object that formatDeserializationSchema returned see code [DynamicKafkaDeserializationSchema deserialize only with value field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] To solve this problem, i think we should remove reuse object of RawFormatDeserializationSchema. was: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse rowData object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the
[jira] [Comment Edited] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17800273#comment-17800273 ] Cai Liuyang edited comment on FLINK-33934 at 12/25/23 11:41 AM: [~hackergin] Yeah, turn off Object Reuse for all DeserializeSchema(that will be used by SourceOperator) is the simple way to avoid this data lost problem. -- update -- after review kafka source code, kafka will deserialize from ConsumerRecord to real object in SourceOperator MainThread, So it will not encounter this problem, see code: [KafkaRecordEmitter code|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L53C43-L53C43] So, this problem will only appear in which case that DeserializeSchema::deserialize is called in FetcherThread was (Author: cailiuyang): [~hackergin] Yeah, turn off Object Reuse for all DeserializeSchema(that will be used by SourceOperator) is the simple way to avoid this data lost problem. > Flink SQL Source use raw format maybe lead to data lost > --- > > Key: FLINK-33934 > URL: https://issues.apache.org/jira/browse/FLINK-33934 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Runtime >Reporter: Cai Liuyang >Priority: Major > > In our product we encounter a case that lead to data lost, the job info: > 1. using flinkSQL that read data from messageQueue and write to hive (only > select value field, doesn't contain metadata field) > 2. the format of source table is raw format > > But if we select value field and metadata field at the same time, than the > data lost will not appear > > After we review the code, we found that the reason is the object reuse of > Raw-format(see code > [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), > why object reuse will lead to this problem is below (take kafka as example): > 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of > SourceOperator, Fetcher-Thread will read and deserialize data from kafka > partition, than put data to ElementQueue (see code [SourceOperator > FetcherTask > |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) > 2. SourceOperator's main thread will pull data from the > ElementQueue(which is shared with the FetcherThread) and process it (see code > [SourceOperator main > thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) > 3. For RawFormatDeserializationSchema, its deserialize function will > return the same object([reuse rowData > object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) > 4. So, if elementQueue have element that not be consumed, than the > fetcherThread can change the filed of the reused rawData that > RawFormatDeserializationSchema::deserialize returned, this will lead to data > lost; > > The reason that we select value and metadata field at the same time will not > encounter data lost is: > if we select metadata field there will return a new RowData object see > code: [DynamicKafkaDeserializationSchema deserialize with metadata field > |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] > and if we only select value filed, it will reuse the RowData object that > formatDeserializationSchema returned see code > [DynamicKafkaDeserializationSchema deserialize only with value > field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] > > To solve this problem, i think we should remove reuse object of > RawFormatDeserializationSchema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17800273#comment-17800273 ] Cai Liuyang commented on FLINK-33934: - [~hackergin] Yeah, turn off Object Reuse for all DeserializeSchema(that will be used by SourceOperator) is the simple way to avoid this data lost problem. > Flink SQL Source use raw format maybe lead to data lost > --- > > Key: FLINK-33934 > URL: https://issues.apache.org/jira/browse/FLINK-33934 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Runtime >Reporter: Cai Liuyang >Priority: Major > > In our product we encounter a case that lead to data lost, the job info: > 1. using flinkSQL that read data from messageQueue and write to hive (only > select value field, doesn't contain metadata field) > 2. the format of source table is raw format > > But if we select value field and metadata field at the same time, than the > data lost will not appear > > After we review the code, we found that the reason is the object reuse of > Raw-format(see code > [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), > why object reuse will lead to this problem is below (take kafka as example): > 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of > SourceOperator, Fetcher-Thread will read and deserialize data from kafka > partition, than put data to ElementQueue (see code [SourceOperator > FetcherTask > |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) > 2. SourceOperator's main thread will pull data from the > ElementQueue(which is shared with the FetcherThread) and process it (see code > [SourceOperator main > thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) > 3. For RawFormatDeserializationSchema, its deserialize function will > return the same object([reuse rowData > object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) > 4. So, if elementQueue have element that not be consumed, than the > fetcherThread can change the filed of the reused rawData that > RawFormatDeserializationSchema::deserialize returned, this will lead to data > lost; > > The reason that we select value and metadata field at the same time will not > encounter data lost is: > if we select metadata field there will return a new RowData object see > code: [DynamicKafkaDeserializationSchema deserialize with metadata field > |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] > and if we only select value filed, it will reuse the RowData object that > formatDeserializationSchema returned see code > [DynamicKafkaDeserializationSchema deserialize only with value > field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] > > To solve this problem, i think we should remove reuse object of > RawFormatDeserializationSchema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-33934: Description: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse rowData object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that RawFormatDeserializationSchema::deserialize returned, this will lead to data lost; The reason that we select value and metadata field at the same time will not encounter data lost is: if we select metadata field there will return a new RowData object see code: [DynamicKafkaDeserializationSchema deserialize with metadata field |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] and if we only select value filed, it will reuse the RowData object that formatDeserializationSchema returned see code [DynamicKafkaDeserializationSchema deserialize only with value field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] To solve this problem, i think we should diable reuse object of RawFormatDeserializationSchema. was: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that
[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-33934: Description: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse rowData object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that RawFormatDeserializationSchema::deserialize returned, this will lead to data lost; The reason that we select value and metadata field at the same time will not encounter data lost is: if we select metadata field there will return a new RowData object see code: [DynamicKafkaDeserializationSchema deserialize with metadata field |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] and if we only select value filed, it will reuse the RowData object that formatDeserializationSchema returned see code [DynamicKafkaDeserializationSchema deserialize only with value field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] To solve this problem, i think we should remove reuse object of RawFormatDeserializationSchema. was: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse rowData object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that
[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-33934: Description: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that RawFormatDeserializationSchema::deserialize returned, this will lead to data lost; The reason that we select value and metadata field at the same time will not encounter data lost is: if we select metadata field there will return a new RowData object see code: [DynamicKafkaDeserializationSchema deserialize with metadata field |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] and if we only select value filed, it will reuse the RowData object that formatDeserializationSchema returned see code [DynamicKafkaDeserializationSchema deserialize only with value field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] To solve this problem, i think we should diable reuse object of RawFormatDeserializationSchema. was: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be use in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that
[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-33934: Description: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that RawFormatDeserializationSchema::deserialize returned, this will lead to data lost; The reason that we select value and metadata field at the same time will not encounter data lost is: if we select metadata field there will return a new RowData object see code: [DynamicKafkaDeserializationSchema deserialize with metadata field |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] and if we only select value filed, it will reuse the RowData object that formatDeserializationSchema returned see code [DynamicKafkaDeserializationSchema deserialize only with value field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] To solve this problem, i think we should diable reuse object of RawFormatDeserializationSchema. was: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that
[jira] [Created] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
Cai Liuyang created FLINK-33934: --- Summary: Flink SQL Source use raw format maybe lead to data lost Key: FLINK-33934 URL: https://issues.apache.org/jira/browse/FLINK-33934 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Runtime Reporter: Cai Liuyang In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be use in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that RawFormatDeserializationSchema::deserialize returned, this will lead to data lost; The reason that we select value and metadata field at the same time will not encounter data lost is: if we select metadata field there will return a new RowData object see code: [DynamicKafkaDeserializationSchema deserialize with metadata field |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] and if we only select value filed, it will reuse the RowData object that formatDeserializationSchema returned see code [DynamicKafkaDeserializationSchema deserialize only with value field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] To solve this problem, i think we should diable reuse object of RawFormatDeserializationSchema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33759) flink parquet writer support write nested array or map type
[ https://issues.apache.org/jira/browse/FLINK-33759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-33759: Description: When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: {code:java} // code placeholder Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.writeArrayData(ParquetRowDataWriter.java:438) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:419) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:471) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:81) at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:89){code} after review the code, we found flink-parquet doesn't support write nested array or map, because [[ArrayWriter|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437]|#L437] and [MapWriter|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391] doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function. was: When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: {code:java} // code placeholder Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.writeArrayData(ParquetRowDataWriter.java:438) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:419) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:471) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:81) at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:89){code} after review the code, we found flink-parquet doesn't support write nested array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function. > flink parquet writer support write nested array or map type > --- > > Key: FLINK-33759 > URL: https://issues.apache.org/jira/browse/FLINK-33759 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Cai Liuyang >Priority: Major > > When we use flink-parquet format wirte Map[] type (which will > be read by spark job), we encounter an exception: > {code:java} > // code placeholder > Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are > illegal, the field should be ommited completely instead > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329) > at > org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.writeArrayData(ParquetRowDataWriter.java:438) > at > org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:419) > at > org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:471) > at > org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:81) > at > org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:89){code} > after review the code, we found flink-parquet doesn't support write nested > array or map, because > [[ArrayWriter|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437]|#L437] > and > [MapWriter|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391] > doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` > function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33759) flink parquet writer support write nested array or map type
[ https://issues.apache.org/jira/browse/FLINK-33759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-33759: Description: When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: {code:java} // code placeholder Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.writeArrayData(ParquetRowDataWriter.java:438) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:419) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:471) at org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:81) at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:89){code} after review the code, we found flink-parquet doesn't support write nested array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function. was: When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: {code:java} // code placeholder org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead {code} after review the code, we found flink-parquet doesn't support write nested array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl ```public void write(ArrayData arrayData, int ordinal) {}``` function. > flink parquet writer support write nested array or map type > --- > > Key: FLINK-33759 > URL: https://issues.apache.org/jira/browse/FLINK-33759 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Cai Liuyang >Priority: Major > > When we use flink-parquet format wirte Map[] type (which will > be read by spark job), we encounter an exception: > {code:java} > // code placeholder > Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are > illegal, the field should be ommited completely instead > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329) > at > org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.writeArrayData(ParquetRowDataWriter.java:438) > at > org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:419) > at > org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:471) > at > org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:81) > at > org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:89){code} > after review the code, we found flink-parquet doesn't support write nested > array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl > `public void write(ArrayData arrayData, int ordinal) {}` function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33759) flink parquet writer support write nested array or map type
[ https://issues.apache.org/jira/browse/FLINK-33759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-33759: Description: When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: {code:java} // code placeholder org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead {code} after review the code, we found flink-parquet doesn't support write nested array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl ```public void write(ArrayData arrayData, int ordinal) {}``` function. was: When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: {code:java} // code placeholder org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead {code} org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead ``, after review the code, we found flink-parquet doesn't support write nested array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function. > flink parquet writer support write nested array or map type > --- > > Key: FLINK-33759 > URL: https://issues.apache.org/jira/browse/FLINK-33759 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Cai Liuyang >Priority: Major > > When we use flink-parquet format wirte Map[] type (which will > be read by spark job), we encounter an exception: > {code:java} > // code placeholder > org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the > field should be ommited completely instead {code} > after review the code, we found flink-parquet doesn't support write nested > array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl > ```public void write(ArrayData arrayData, int ordinal) {}``` function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33759) flink parquet writer support write nested array or map type
[ https://issues.apache.org/jira/browse/FLINK-33759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-33759: Description: When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: `org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead`, after review the code, we found flink-parquet doesn't support write nested array or map, because [ArrayWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)]] and [MapWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)]] doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function. (was: When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: `org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead`, after review the code, we found flink-parquet doesn't support write nested array or map, because [ArrayWriter]([https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)] and [MapWriter]([https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)] doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function.) > flink parquet writer support write nested array or map type > --- > > Key: FLINK-33759 > URL: https://issues.apache.org/jira/browse/FLINK-33759 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Cai Liuyang >Priority: Major > > When we use flink-parquet format wirte Map[] type (which will > be read by spark job), we encounter an exception: > `org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, > the field should be ommited completely instead`, after review the code, we > found flink-parquet doesn't support write nested array or map, because > [ArrayWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)]] > and > [MapWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)]] > doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` > function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33759) flink parquet writer support write nested array or map type
[ https://issues.apache.org/jira/browse/FLINK-33759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-33759: Description: When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: {code:java} // code placeholder org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead {code} org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead ``, after review the code, we found flink-parquet doesn't support write nested array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function. was:When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: `org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead`, after review the code, we found flink-parquet doesn't support write nested array or map, because [ArrayWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)]] and [MapWriter|[https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391|https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)]] doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function. > flink parquet writer support write nested array or map type > --- > > Key: FLINK-33759 > URL: https://issues.apache.org/jira/browse/FLINK-33759 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Cai Liuyang >Priority: Major > > When we use flink-parquet format wirte Map[] type (which will > be read by spark job), we encounter an exception: > {code:java} > // code placeholder > org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the > field should be ommited completely instead {code} > org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the > field should be ommited completely instead > ``, after review the code, we found flink-parquet doesn't support write > nested array or map, because [ArrayWriter|#L437] and [MapWriter|#L391] > doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` > function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33759) flink parquet writer support write nested array or map type
Cai Liuyang created FLINK-33759: --- Summary: flink parquet writer support write nested array or map type Key: FLINK-33759 URL: https://issues.apache.org/jira/browse/FLINK-33759 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Cai Liuyang When we use flink-parquet format wirte Map[] type (which will be read by spark job), we encounter an exception: `org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead`, after review the code, we found flink-parquet doesn't support write nested array or map, because [ArrayWriter]([https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)] and [MapWriter]([https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)] doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost
[ https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733451#comment-17733451 ] Cai Liuyang commented on FLINK-32362: - [~fanrui] I compare the two ways, in common case if one task is unready, other subtask may be also unready with high probability, the inner-catch will print many useless log(with the same reason), so i chose the outer-catch > SourceAlignment announceCombinedWatermark period task maybe lost > > > Key: FLINK-32362 > URL: https://issues.apache.org/jira/browse/FLINK-32362 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > Labels: pull-request-available > > When we use sourcealignment,we also found there is another problem that > announceCombinedWatermark may throw a exception (like "subtask 25 is not > ready yet to receive events" , this subtask maybe under failover), which will > lead the period task not running any more (ThreadPoolExecutor will not > schedule the period task if it throw a exception) > I think we should increase the robustness of announceCombinedWatermark > function to avoid it throw any exception (if send fail, just wait next send) > (code see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] > ) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost
[ https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733349#comment-17733349 ] Cai Liuyang commented on FLINK-32362: - Two ways: # the simple way code like: {code:java} // code placeholder try { for (Integer subtaskId : subTaskIds) { context.sendEventToSourceOperator( subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); } } catch (Throwable ignore) { LOG.warn("Announce the newest combined watermark to source failed, task maybe during failover, wait next time to announce." }{code} # add a trySendEventToSourceOperator(), this method only send when task is ready and not throw exception if task is not ready. i prefer the first one, because it's simple and it can also cover some other exception like rpc timeout > SourceAlignment announceCombinedWatermark period task maybe lost > > > Key: FLINK-32362 > URL: https://issues.apache.org/jira/browse/FLINK-32362 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > > When we use sourcealignment,we also found there is another problem that > announceCombinedWatermark may throw a exception (like "subtask 25 is not > ready yet to receive events" , this subtask maybe under failover), which will > lead the period task not running any more (ThreadPoolExecutor will not > schedule the period task if it throw a exception) > I think we should increase the robustness of announceCombinedWatermark > function to avoid it throw any exception (if send fail, just wait next send) > (code see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] > ) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost
[ https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733349#comment-17733349 ] Cai Liuyang edited comment on FLINK-32362 at 6/16/23 6:38 AM: -- [~fanrui] Two ways: # the simple way code like: {code:java} // code placeholder try { for (Integer subtaskId : subTaskIds) { context.sendEventToSourceOperator( subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); } } catch (Throwable ignore) { LOG.warn("Announce the newest combined watermark to source failed, task maybe during failover, wait next time to announce." }{code} # add a trySendEventToSourceOperator(), this method only send when task is ready and not throw exception if task is not ready. i prefer the first one, because it's simple and it can also cover some other exception like rpc timeout was (Author: cailiuyang): Two ways: # the simple way code like: {code:java} // code placeholder try { for (Integer subtaskId : subTaskIds) { context.sendEventToSourceOperator( subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); } } catch (Throwable ignore) { LOG.warn("Announce the newest combined watermark to source failed, task maybe during failover, wait next time to announce." }{code} # add a trySendEventToSourceOperator(), this method only send when task is ready and not throw exception if task is not ready. i prefer the first one, because it's simple and it can also cover some other exception like rpc timeout > SourceAlignment announceCombinedWatermark period task maybe lost > > > Key: FLINK-32362 > URL: https://issues.apache.org/jira/browse/FLINK-32362 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > > When we use sourcealignment,we also found there is another problem that > announceCombinedWatermark may throw a exception (like "subtask 25 is not > ready yet to receive events" , this subtask maybe under failover), which will > lead the period task not running any more (ThreadPoolExecutor will not > schedule the period task if it throw a exception) > I think we should increase the robustness of announceCombinedWatermark > function to avoid it throw any exception (if send fail, just wait next send) > (code see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] > ) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost
[ https://issues.apache.org/jira/browse/FLINK-32362 ] Cai Liuyang deleted comment on FLINK-32362: - was (Author: cailiuyang): [~fanrui] Two ways: > SourceAlignment announceCombinedWatermark period task maybe lost > > > Key: FLINK-32362 > URL: https://issues.apache.org/jira/browse/FLINK-32362 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > > When we use sourcealignment,we also found there is another problem that > announceCombinedWatermark may throw a exception (like "subtask 25 is not > ready yet to receive events" , this subtask maybe under failover), which will > lead the period task not running any more (ThreadPoolExecutor will not > schedule the period task if it throw a exception) > I think we should increase the robustness of announceCombinedWatermark > function to avoid it throw any exception (if send fail, just wait next send) > (code see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] > ) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost
[ https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733345#comment-17733345 ] Cai Liuyang commented on FLINK-32362: - [~fanrui] Two ways: > SourceAlignment announceCombinedWatermark period task maybe lost > > > Key: FLINK-32362 > URL: https://issues.apache.org/jira/browse/FLINK-32362 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > > When we use sourcealignment,we also found there is another problem that > announceCombinedWatermark may throw a exception (like "subtask 25 is not > ready yet to receive events" , this subtask maybe under failover), which will > lead the period task not running any more (ThreadPoolExecutor will not > schedule the period task if it throw a exception) > I think we should increase the robustness of announceCombinedWatermark > function to avoid it throw any exception (if send fail, just wait next send) > (code see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] > ) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-32316: Description: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. And we also found another problem see jira: https://issues.apache.org/jira/browse/FLINK-32362 was: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. -And we also found there is another problem that announceCombinedWatermark may throw a exception (like "subtask 25 is not ready yet to receive events" , this subtask maybe under failover) lead the period task not running any more (ThreadPoolExecutor will not schedule the period task if it throw a exception), i think we should increase the robustness of announceCombinedWatermark function to cover this case (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] )- (see jira: https://issues.apache.org/jira/browse/FLINK-32362) > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is created it will create the > DefaultExecutionGraph, this will init the first sourceCoordinator but will > not start it. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling), so the first > SourceCoordinator will not be fully closed. > > And we also found another problem see jira: > https://issues.apache.org/jira/browse/FLINK-32362 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-32316: Description: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. -And we also found there is another problem that announceCombinedWatermark may throw a exception (like "subtask 25 is not ready yet to receive events" , this subtask maybe under failover) lead the period task not running any more (ThreadPoolExecutor will not schedule the period task if it throw a exception), i think we should increase the robustness of announceCombinedWatermark function to cover this case (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] )- (see jira: https://issues.apache.org/jira/browse/FLINK-32362) was: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. And we also found there is another problem that announceCombinedWatermark may throw a exception (like "subtask 25 is not ready yet to receive events" , this subtask maybe under failover) lead the period task not running any more (ThreadPoolExecutor will not schedule the period task if it throw a exception), i think we should increase the robustness of announceCombinedWatermark function to cover this case (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] ) > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is created it will create the > DefaultExecutionGraph, this will init the first sourceCoordinator but will > not start it. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old
[jira] [Created] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost
Cai Liuyang created FLINK-32362: --- Summary: SourceAlignment announceCombinedWatermark period task maybe lost Key: FLINK-32362 URL: https://issues.apache.org/jira/browse/FLINK-32362 Project: Flink Issue Type: Bug Affects Versions: 1.16.0 Reporter: Cai Liuyang When we use sourcealignment,we also found there is another problem that announceCombinedWatermark may throw a exception (like "subtask 25 is not ready yet to receive events" , this subtask maybe under failover), which will lead the period task not running any more (ThreadPoolExecutor will not schedule the period task if it throw a exception) I think we should increase the robustness of announceCombinedWatermark function to avoid it throw any exception (if send fail, just wait next send) (code see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] ) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733278#comment-17733278 ] Cai Liuyang commented on FLINK-32316: - [~fanrui] OK,please assign to me,thks~ > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is created it will create the > DefaultExecutionGraph, this will init the first sourceCoordinator but will > not start it. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling), so the first > SourceCoordinator will not be fully closed. > > And we also found there is another problem that announceCombinedWatermark may > throw a exception (like "subtask 25 is not ready yet to receive events" , > this subtask maybe under failover) lead the period task not running any more > (ThreadPoolExecutor will not schedule the period task if it throw a > exception), i think we should increase the robustness of > announceCombinedWatermark function to cover this case (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] > ) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-32316: Description: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. And we also found there is another problem that announceCombinedWatermark may throw a exception (like "subtask 25 is not ready yet to receive events" , this subtask maybe under failover) lead the period task not running any more (ThreadPoolExecutor will not schedule the period task if it throw a exception), i think we should increase the robustness of announceCombinedWatermark function to cover this case (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] ) was: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is created it will create the > DefaultExecutionGraph, this will init the first sourceCoordinator but will > not start it. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling), so the first > SourceCoordinator will not be fully closed. > > And we also found there is another problem that announceCombinedWatermark may > throw a exception (like "subtask 25 is not ready yet to receive events" , > this subtask maybe under failover) lead the period task not running any more > (ThreadPoolExecutor will not schedule the period task if it throw a > exception), i think we should increase the robustness of > announceCombinedWatermark
[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-32316: Description: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. was: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover, and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is created it will create the > DefaultExecutionGraph, this will init the first sourceCoordinator but will > not start it. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling), so the first > SourceCoordinator will not be fully closed. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-32316: Description: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover, and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. was: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover, and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is create it will create the DefaultExecutionGraph. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling, so the first SourceCoordinator will not be fully closed). > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover, > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is created it will create the > DefaultExecutionGraph, this will init the first sourceCoordinator but will > not start it. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling), so the first > SourceCoordinator will not be fully closed. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731608#comment-17731608 ] Cai Liuyang commented on FLINK-32316: - [~pnowojski] please take a look? thks~ > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover, > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is create it will create the > DefaultExecutionGraph. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling, so the first > SourceCoordinator will not be fully closed). > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
Cai Liuyang created FLINK-32316: --- Summary: Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover Key: FLINK-32316 URL: https://issues.apache.org/jira/browse/FLINK-32316 Project: Flink Issue Type: Bug Affects Versions: 1.16.0 Reporter: Cai Liuyang When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover, and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is create it will create the DefaultExecutionGraph. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling, so the first SourceCoordinator will not be fully closed). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29577) Disable rocksdb wal when restore from full snapshot
[ https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang closed FLINK-29577. --- Resolution: Fixed > Disable rocksdb wal when restore from full snapshot > --- > > Key: FLINK-29577 > URL: https://issues.apache.org/jira/browse/FLINK-29577 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > Labels: pull-request-available > Attachments: image-2022-10-20-16-08-15-746.png, > image-2022-10-20-16-11-04-211.png > > > For now, RocksDBFullRestoreOperation and > RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to > RocksDBWriteBatchWrapper when restore kv-data, which will use > RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal > explicitly, see code below), so during restoring from full snapshot, wal is > enabled(use more disk and maybe affect rocksdb-write-performance when > restoring) > > {code:java} > // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't > pass WriteOptions to RocksDBWriteBatchWrapper(null as default) > private void restoreKVStateData( > ThrowingIterator keyGroups, > Map columnFamilies, > Map> > restoredPQStates) > throws IOException, RocksDBException, StateMigrationException { > // for all key-groups in the current state handle... > try (RocksDBWriteBatchWrapper writeBatchWrapper = > new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), > writeBatchSize)) { > HeapPriorityQueueSnapshotRestoreWrapper > restoredPQ = null; > ColumnFamilyHandle handle = null; >.. > } > // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal > explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper > public void flush() throws RocksDBException { > if (options != null) { > db.write(options, batch); > } else { > // use the default WriteOptions, if wasn't provided. > try (WriteOptions writeOptions = new WriteOptions()) { > db.write(writeOptions, batch); > } > } > batch.clear(); > } > {code} > > > As we known, rocksdb's wal is usesless for flink, so i think we can disable > wal for RocksDBWriteBatchWrapper's default WriteOptions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29577) Disable rocksdb wal when restore from full snapshot
[ https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620889#comment-17620889 ] Cai Liuyang edited comment on FLINK-29577 at 10/20/22 8:11 AM: --- [~ym] This is my test case: 1. disable rocksdb managed memory 2. use rocksdb full snapshot strategy 3. KeyProcessor have one subtask, five state(state1, state2, state3, state4, state5), total state size is 2GB 4. the size of state1 is small, the total size of state1 is less than rocksdb default memtable size(64M), such as state1 just have one record after restoring, rocksdb wal can not be gc(see the png1), the reason i guess is state1's memtable doesn't flushed, and only by gced until state1's memtable be flushed (Rocksdb Full snapshot only take a snapshot). During my test, I found if state1 ~ state4 size is big but state5 is small(only have one record), then there only be one wal file(see png2) In my test, disable wal doesn't imporve restore speed, two case(disable / enable wal during restore) is almost the same. !image-2022-10-20-16-08-15-746.png! !image-2022-10-20-16-11-04-211.png! was (Author: cailiuyang): [~ym] This is my test case: 1. disable rocksdb managed memory 2. use rocksdb full snapshot strategy 3. KeyProcessor have one subtask, five state(state1, state2, state3, state4, state5), total state size is 2GB 4. the size of state1 is small, the total size of state1 is less than rocksdb default memtable size(64M), such as state1 just have one record after restoring, rocksdb wal can not be gc(see the png1), the reason i guess is state1's memtable doesn't flushed, and only by gced until state1's memtable be flushed (Rocksdb Full snapshot only take a snapshot). During my test, I found if state1 ~ state4 size is big but state5 is small(only have one record), then there only be one wal file(see png2) In my test, disable wal doesn't imporve restore speed, two case(disable / enable wal during restore) is almost the same. !image-2022-10-20-16-08-15-746.png!!image-2022-10-20-16-08-54-359.png! > Disable rocksdb wal when restore from full snapshot > --- > > Key: FLINK-29577 > URL: https://issues.apache.org/jira/browse/FLINK-29577 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > Labels: pull-request-available > Attachments: image-2022-10-20-16-08-15-746.png, > image-2022-10-20-16-11-04-211.png > > > For now, RocksDBFullRestoreOperation and > RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to > RocksDBWriteBatchWrapper when restore kv-data, which will use > RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal > explicitly, see code below), so during restoring from full snapshot, wal is > enabled(use more disk and maybe affect rocksdb-write-performance when > restoring) > > {code:java} > // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't > pass WriteOptions to RocksDBWriteBatchWrapper(null as default) > private void restoreKVStateData( > ThrowingIterator keyGroups, > Map columnFamilies, > Map> > restoredPQStates) > throws IOException, RocksDBException, StateMigrationException { > // for all key-groups in the current state handle... > try (RocksDBWriteBatchWrapper writeBatchWrapper = > new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), > writeBatchSize)) { > HeapPriorityQueueSnapshotRestoreWrapper > restoredPQ = null; > ColumnFamilyHandle handle = null; >.. > } > // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal > explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper > public void flush() throws RocksDBException { > if (options != null) { > db.write(options, batch); > } else { > // use the default WriteOptions, if wasn't provided. > try (WriteOptions writeOptions = new WriteOptions()) { > db.write(writeOptions, batch); > } > } > batch.clear(); > } > {code} > > > As we known, rocksdb's wal is usesless for flink, so i think we can disable > wal for RocksDBWriteBatchWrapper's default WriteOptions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29577) Disable rocksdb wal when restore from full snapshot
[ https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620889#comment-17620889 ] Cai Liuyang commented on FLINK-29577: - [~ym] This is my test case: 1. disable rocksdb managed memory 2. use rocksdb full snapshot strategy 3. KeyProcessor have one subtask, five state(state1, state2, state3, state4, state5), total state size is 2GB 4. the size of state1 is small, the total size of state1 is less than rocksdb default memtable size(64M), such as state1 just have one record after restoring, rocksdb wal can not be gc(see the png1), the reason i guess is state1's memtable doesn't flushed, and only by gced until state1's memtable be flushed (Rocksdb Full snapshot only take a snapshot). During my test, I found if state1 ~ state4 size is big but state5 is small(only have one record), then there only be one wal file(see png2) In my test, disable wal doesn't imporve restore speed, two case(disable / enable wal during restore) is almost the same. !image-2022-10-20-16-08-15-746.png!!image-2022-10-20-16-08-54-359.png! > Disable rocksdb wal when restore from full snapshot > --- > > Key: FLINK-29577 > URL: https://issues.apache.org/jira/browse/FLINK-29577 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > Labels: pull-request-available > Attachments: image-2022-10-20-16-08-15-746.png > > > For now, RocksDBFullRestoreOperation and > RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to > RocksDBWriteBatchWrapper when restore kv-data, which will use > RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal > explicitly, see code below), so during restoring from full snapshot, wal is > enabled(use more disk and maybe affect rocksdb-write-performance when > restoring) > > {code:java} > // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't > pass WriteOptions to RocksDBWriteBatchWrapper(null as default) > private void restoreKVStateData( > ThrowingIterator keyGroups, > Map columnFamilies, > Map> > restoredPQStates) > throws IOException, RocksDBException, StateMigrationException { > // for all key-groups in the current state handle... > try (RocksDBWriteBatchWrapper writeBatchWrapper = > new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), > writeBatchSize)) { > HeapPriorityQueueSnapshotRestoreWrapper > restoredPQ = null; > ColumnFamilyHandle handle = null; >.. > } > // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal > explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper > public void flush() throws RocksDBException { > if (options != null) { > db.write(options, batch); > } else { > // use the default WriteOptions, if wasn't provided. > try (WriteOptions writeOptions = new WriteOptions()) { > db.write(writeOptions, batch); > } > } > batch.clear(); > } > {code} > > > As we known, rocksdb's wal is usesless for flink, so i think we can disable > wal for RocksDBWriteBatchWrapper's default WriteOptions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29577) Disable rocksdb wal when restore from full snapshot
[ https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-29577: Attachment: image-2022-10-20-16-08-15-746.png > Disable rocksdb wal when restore from full snapshot > --- > > Key: FLINK-29577 > URL: https://issues.apache.org/jira/browse/FLINK-29577 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > Labels: pull-request-available > Attachments: image-2022-10-20-16-08-15-746.png > > > For now, RocksDBFullRestoreOperation and > RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to > RocksDBWriteBatchWrapper when restore kv-data, which will use > RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal > explicitly, see code below), so during restoring from full snapshot, wal is > enabled(use more disk and maybe affect rocksdb-write-performance when > restoring) > > {code:java} > // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't > pass WriteOptions to RocksDBWriteBatchWrapper(null as default) > private void restoreKVStateData( > ThrowingIterator keyGroups, > Map columnFamilies, > Map> > restoredPQStates) > throws IOException, RocksDBException, StateMigrationException { > // for all key-groups in the current state handle... > try (RocksDBWriteBatchWrapper writeBatchWrapper = > new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), > writeBatchSize)) { > HeapPriorityQueueSnapshotRestoreWrapper > restoredPQ = null; > ColumnFamilyHandle handle = null; >.. > } > // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal > explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper > public void flush() throws RocksDBException { > if (options != null) { > db.write(options, batch); > } else { > // use the default WriteOptions, if wasn't provided. > try (WriteOptions writeOptions = new WriteOptions()) { > db.write(writeOptions, batch); > } > } > batch.clear(); > } > {code} > > > As we known, rocksdb's wal is usesless for flink, so i think we can disable > wal for RocksDBWriteBatchWrapper's default WriteOptions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29577) Disable rocksdb wal when restore from full snapshot
[ https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-29577: Attachment: (was: image-2022-10-12-20-15-11-219.png) > Disable rocksdb wal when restore from full snapshot > --- > > Key: FLINK-29577 > URL: https://issues.apache.org/jira/browse/FLINK-29577 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > Labels: pull-request-available > > For now, RocksDBFullRestoreOperation and > RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to > RocksDBWriteBatchWrapper when restore kv-data, which will use > RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal > explicitly, see code below), so during restoring from full snapshot, wal is > enabled(use more disk and maybe affect rocksdb-write-performance when > restoring) > > {code:java} > // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't > pass WriteOptions to RocksDBWriteBatchWrapper(null as default) > private void restoreKVStateData( > ThrowingIterator keyGroups, > Map columnFamilies, > Map> > restoredPQStates) > throws IOException, RocksDBException, StateMigrationException { > // for all key-groups in the current state handle... > try (RocksDBWriteBatchWrapper writeBatchWrapper = > new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), > writeBatchSize)) { > HeapPriorityQueueSnapshotRestoreWrapper > restoredPQ = null; > ColumnFamilyHandle handle = null; >.. > } > // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal > explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper > public void flush() throws RocksDBException { > if (options != null) { > db.write(options, batch); > } else { > // use the default WriteOptions, if wasn't provided. > try (WriteOptions writeOptions = new WriteOptions()) { > db.write(writeOptions, batch); > } > } > batch.clear(); > } > {code} > > > As we known, rocksdb's wal is usesless for flink, so i think we can disable > wal for RocksDBWriteBatchWrapper's default WriteOptions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29577) Disable rocksdb wal when restore from full snapshot
[ https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-29577: Attachment: image-2022-10-12-20-15-11-219.png > Disable rocksdb wal when restore from full snapshot > --- > > Key: FLINK-29577 > URL: https://issues.apache.org/jira/browse/FLINK-29577 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > Labels: pull-request-available > Attachments: image-2022-10-12-20-15-11-219.png > > > For now, RocksDBFullRestoreOperation and > RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to > RocksDBWriteBatchWrapper when restore kv-data, which will use > RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal > explicitly, see code below), so during restoring from full snapshot, wal is > enabled(use more disk and maybe affect rocksdb-write-performance when > restoring) > > {code:java} > // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't > pass WriteOptions to RocksDBWriteBatchWrapper(null as default) > private void restoreKVStateData( > ThrowingIterator keyGroups, > Map columnFamilies, > Map> > restoredPQStates) > throws IOException, RocksDBException, StateMigrationException { > // for all key-groups in the current state handle... > try (RocksDBWriteBatchWrapper writeBatchWrapper = > new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), > writeBatchSize)) { > HeapPriorityQueueSnapshotRestoreWrapper > restoredPQ = null; > ColumnFamilyHandle handle = null; >.. > } > // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal > explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper > public void flush() throws RocksDBException { > if (options != null) { > db.write(options, batch); > } else { > // use the default WriteOptions, if wasn't provided. > try (WriteOptions writeOptions = new WriteOptions()) { > db.write(writeOptions, batch); > } > } > batch.clear(); > } > {code} > > > As we known, rocksdb's wal is usesless for flink, so i think we can disable > wal for RocksDBWriteBatchWrapper's default WriteOptions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29577) Disable rocksdb wal when restore from full snapshot
[ https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-29577: Description: For now, RocksDBFullRestoreOperation and RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to RocksDBWriteBatchWrapper when restore kv-data, which will use RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal explicitly, see code below), so during restoring from full snapshot, wal is enabled(use more disk and maybe affect rocksdb-write-performance when restoring) {code:java} // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't pass WriteOptions to RocksDBWriteBatchWrapper(null as default) private void restoreKVStateData( ThrowingIterator keyGroups, Map columnFamilies, Map> restoredPQStates) throws IOException, RocksDBException, StateMigrationException { // for all key-groups in the current state handle... try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) { HeapPriorityQueueSnapshotRestoreWrapper restoredPQ = null; ColumnFamilyHandle handle = null; .. } // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper public void flush() throws RocksDBException { if (options != null) { db.write(options, batch); } else { // use the default WriteOptions, if wasn't provided. try (WriteOptions writeOptions = new WriteOptions()) { db.write(writeOptions, batch); } } batch.clear(); } {code} As we known, rocksdb's wal is usesless for flink, so i think we can disable wal for RocksDBWriteBatchWrapper's default WriteOptions. was: For now, RocksDBFullRestoreOperation and RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to RocksDBWriteBatchWrapper when restore kv-data, which will use RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal explicitly, see code below), so during restoring from full snapshot, wal is enabled(use more disk and affect rocksdb-write-performance when restoring) {code:java} // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't pass WriteOptions to RocksDBWriteBatchWrapper(null as default) private void restoreKVStateData( ThrowingIterator keyGroups, Map columnFamilies, Map> restoredPQStates) throws IOException, RocksDBException, StateMigrationException { // for all key-groups in the current state handle... try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) { HeapPriorityQueueSnapshotRestoreWrapper restoredPQ = null; ColumnFamilyHandle handle = null; .. } // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper public void flush() throws RocksDBException { if (options != null) { db.write(options, batch); } else { // use the default WriteOptions, if wasn't provided. try (WriteOptions writeOptions = new WriteOptions()) { db.write(writeOptions, batch); } } batch.clear(); } {code} As we known, rocksdb's wal is usesless for flink, so i think we can disable wal for RocksDBWriteBatchWrapper's default WriteOptions. > Disable rocksdb wal when restore from full snapshot > --- > > Key: FLINK-29577 > URL: https://issues.apache.org/jira/browse/FLINK-29577 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > Labels: pull-request-available > > For now, RocksDBFullRestoreOperation and > RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to > RocksDBWriteBatchWrapper when restore kv-data, which will use > RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal > explicitly, see code below), so during restoring from full snapshot, wal is > enabled(use more disk and maybe affect rocksdb-write-performance when > restoring) > > {code:java} > // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't > pass WriteOptions to RocksDBWriteBatchWrapper(null as default) > private void restoreKVStateData( > ThrowingIterator keyGroups, > Map columnFamilies, > Map> > restoredPQStates) > throws IOException, RocksDBException, StateMigrationException { > // for all key-groups in the current state handle... >
[jira] [Commented] (FLINK-29577) Disable rocksdb wal when restore from full snapshot
[ https://issues.apache.org/jira/browse/FLINK-29577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17615862#comment-17615862 ] Cai Liuyang commented on FLINK-29577: - [~yunta] Sorry, i only observe disk usage doubled during restoring, write performance doesn't test, but i think disable wal maybe improve performance more or less(if i'm wrong, please correct me) > Disable rocksdb wal when restore from full snapshot > --- > > Key: FLINK-29577 > URL: https://issues.apache.org/jira/browse/FLINK-29577 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Cai Liuyang >Assignee: Cai Liuyang >Priority: Major > Labels: pull-request-available > > For now, RocksDBFullRestoreOperation and > RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to > RocksDBWriteBatchWrapper when restore kv-data, which will use > RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal > explicitly, see code below), so during restoring from full snapshot, wal is > enabled(use more disk and affect rocksdb-write-performance when restoring) > > {code:java} > // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't > pass WriteOptions to RocksDBWriteBatchWrapper(null as default) > private void restoreKVStateData( > ThrowingIterator keyGroups, > Map columnFamilies, > Map> > restoredPQStates) > throws IOException, RocksDBException, StateMigrationException { > // for all key-groups in the current state handle... > try (RocksDBWriteBatchWrapper writeBatchWrapper = > new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), > writeBatchSize)) { > HeapPriorityQueueSnapshotRestoreWrapper > restoredPQ = null; > ColumnFamilyHandle handle = null; >.. > } > // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal > explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper > public void flush() throws RocksDBException { > if (options != null) { > db.write(options, batch); > } else { > // use the default WriteOptions, if wasn't provided. > try (WriteOptions writeOptions = new WriteOptions()) { > db.write(writeOptions, batch); > } > } > batch.clear(); > } > {code} > > > As we known, rocksdb's wal is usesless for flink, so i think we can disable > wal for RocksDBWriteBatchWrapper's default WriteOptions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29577) Disable rocksdb wal when restore from full snapshot
Cai Liuyang created FLINK-29577: --- Summary: Disable rocksdb wal when restore from full snapshot Key: FLINK-29577 URL: https://issues.apache.org/jira/browse/FLINK-29577 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Cai Liuyang For now, RocksDBFullRestoreOperation and RocksDBHeapTimersFullRestoreOperation does's pass RocksDB::WriteOptions to RocksDBWriteBatchWrapper when restore kv-data, which will use RocksDBWriteBatchWrapper‘s default WriteOptions(doesn't disable rocksdb wal explicitly, see code below), so during restoring from full snapshot, wal is enabled(use more disk and affect rocksdb-write-performance when restoring) {code:java} // First: RocksDBHeapTimersFullRestoreOperation::restoreKVStateData() doesn't pass WriteOptions to RocksDBWriteBatchWrapper(null as default) private void restoreKVStateData( ThrowingIterator keyGroups, Map columnFamilies, Map> restoredPQStates) throws IOException, RocksDBException, StateMigrationException { // for all key-groups in the current state handle... try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) { HeapPriorityQueueSnapshotRestoreWrapper restoredPQ = null; ColumnFamilyHandle handle = null; .. } // Second: RocksDBWriteBatchWrapper::flush function doesn't disable wal explicitly when user doesn't pass WriteOptions to RocksDBWriteBatchWrapper public void flush() throws RocksDBException { if (options != null) { db.write(options, batch); } else { // use the default WriteOptions, if wasn't provided. try (WriteOptions writeOptions = new WriteOptions()) { db.write(writeOptions, batch); } } batch.clear(); } {code} As we known, rocksdb's wal is usesless for flink, so i think we can disable wal for RocksDBWriteBatchWrapper's default WriteOptions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17494359#comment-17494359 ] Cai Liuyang commented on FLINK-26080: - Yeah, looks like the same problem, thks [~pnowojski] , i'll read this issue carefully~ > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its all donwStreamTask is idle, job > will keep this status until chk is timeout(use aligned chk); After we analyse > this case, we found Half-opend-socket (see > [https://en.wikipedia.org/wiki/TCP_half-open] ) which is already closed on > server side but established on client side,lead to this: > 1. NettyServer encounter ReadTimeoutException when read data from > channel, then it will release the NetworkSequenceViewReader (which is > responsable to send data to PartitionRequestClient) and write ErrorResponse > to PartitionRequestClient. After writing ErrorResponse success, server will > close the channel (socket will be transformed to fin_wait1 status) > 2. PartitionRequestClient doesn't receive the ErrorResponse and server's > FIN, so client will keep socket be establised status and waiting for > BufferResponse from server (maybe our machine's kernel-bug lead to > ErrorResponse and FIN lost ) > 3. Server machine will release the socket if it keep fin_wait1 status for > two long time, but the socket on client machine is also under established > status, and so lead to Half-opened-socket > To avoid this case,I think there are two methods: > 1. Client enable TCP keep alive(flink is already enabled): this way > should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's > default time is 7200 seconds, which is two long). > 2. Client use netty‘s IdleStateHandler to detect whether channel is > idle(read or write), if channel is idle, client will try to write pingMsg to > server to detect whether channel is really ok. > For the two methods, i recommend the method-2, because adjustment of > machine's tcp-keep-alive time will have an impact on other service running on > the same machine > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true
[ https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang closed FLINK-25664. --- Resolution: Abandoned > Notify will be not triggered for PipelinedSubpartition if more than one > buffer is added during isBlocked == true > > > Key: FLINK-25664 > URL: https://issues.apache.org/jira/browse/FLINK-25664 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Critical > Labels: pull-request-available > > For now, there might be case like: > # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked > == false) > # CreditBasedSequenceNumberingViewReader pool this buffer and > PipelinedSubPartition become to Blocked (isBlocked == true) > # Before downStream resumeConsumption, we add two finished-buffer to this > PipelinedSubPartition (there is no limit for adding buffer to > blocked-PipelinedSubPartition) > ## add the first finished-buffer will not notifyDataAvailable because > isBlocked == true > ## add the second finished-buffer will also not notifyDataAvailable because > of isBlocked == true and finishedBuffer > 1 > # DownStream resumeConsumption, PipelinedSubPartition is unblocked > (isBlocked == false) > # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable > because of finishedBuffer > 1 > In conclusion,There are three case we should trigger notifyDataAvailable: > case1: only have one finished buffer (handled by add) > case2: only have one unfinished buffer (handled by flush) > case3: have more than on finished buffer, which is add during > PipelinedSubPartition is blocked (not handled) > {code:java} > // test code for this case > // add this test case to > org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest > > @Test > public void > testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption() > throws Exception { > blockSubpartitionByCheckpoint(1); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > assertEquals(1, availablityListener.getNumNotifications()); > readView.resumeConsumption(); > subpartition.flush(); > assertEquals(2, availablityListener.getNumNotifications()); > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true
[ https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490882#comment-17490882 ] Cai Liuyang commented on FLINK-25664: - yeah, got it. I forgot this logic before, thanks ~ > Notify will be not triggered for PipelinedSubpartition if more than one > buffer is added during isBlocked == true > > > Key: FLINK-25664 > URL: https://issues.apache.org/jira/browse/FLINK-25664 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Critical > Labels: pull-request-available > > For now, there might be case like: > # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked > == false) > # CreditBasedSequenceNumberingViewReader pool this buffer and > PipelinedSubPartition become to Blocked (isBlocked == true) > # Before downStream resumeConsumption, we add two finished-buffer to this > PipelinedSubPartition (there is no limit for adding buffer to > blocked-PipelinedSubPartition) > ## add the first finished-buffer will not notifyDataAvailable because > isBlocked == true > ## add the second finished-buffer will also not notifyDataAvailable because > of isBlocked == true and finishedBuffer > 1 > # DownStream resumeConsumption, PipelinedSubPartition is unblocked > (isBlocked == false) > # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable > because of finishedBuffer > 1 > In conclusion,There are three case we should trigger notifyDataAvailable: > case1: only have one finished buffer (handled by add) > case2: only have one unfinished buffer (handled by flush) > case3: have more than on finished buffer, which is add during > PipelinedSubPartition is blocked (not handled) > {code:java} > // test code for this case > // add this test case to > org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest > > @Test > public void > testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption() > throws Exception { > blockSubpartitionByCheckpoint(1); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > assertEquals(1, availablityListener.getNumNotifications()); > readView.resumeConsumption(); > subpartition.flush(); > assertEquals(2, availablityListener.getNumNotifications()); > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true
[ https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490882#comment-17490882 ] Cai Liuyang edited comment on FLINK-25664 at 2/11/22, 12:20 PM: yeah, got it. I forgot this logic before, thanks ~ [~pnowojski] was (Author: cailiuyang): yeah, got it. I forgot this logic before, thanks ~ > Notify will be not triggered for PipelinedSubpartition if more than one > buffer is added during isBlocked == true > > > Key: FLINK-25664 > URL: https://issues.apache.org/jira/browse/FLINK-25664 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Critical > Labels: pull-request-available > > For now, there might be case like: > # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked > == false) > # CreditBasedSequenceNumberingViewReader pool this buffer and > PipelinedSubPartition become to Blocked (isBlocked == true) > # Before downStream resumeConsumption, we add two finished-buffer to this > PipelinedSubPartition (there is no limit for adding buffer to > blocked-PipelinedSubPartition) > ## add the first finished-buffer will not notifyDataAvailable because > isBlocked == true > ## add the second finished-buffer will also not notifyDataAvailable because > of isBlocked == true and finishedBuffer > 1 > # DownStream resumeConsumption, PipelinedSubPartition is unblocked > (isBlocked == false) > # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable > because of finishedBuffer > 1 > In conclusion,There are three case we should trigger notifyDataAvailable: > case1: only have one finished buffer (handled by add) > case2: only have one unfinished buffer (handled by flush) > case3: have more than on finished buffer, which is add during > PipelinedSubPartition is blocked (not handled) > {code:java} > // test code for this case > // add this test case to > org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest > > @Test > public void > testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption() > throws Exception { > blockSubpartitionByCheckpoint(1); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > assertEquals(1, availablityListener.getNumNotifications()); > readView.resumeConsumption(); > subpartition.flush(); > assertEquals(2, availablityListener.getNumNotifications()); > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Description: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found Half-opend-socket (see [https://en.wikipedia.org/wiki/TCP_half-open] ) which is already closed on server side but established on client side,lead to this: 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient. After writing ErrorResponse success, server will close the channel (socket will be transformed to fin_wait1 status) 2. PartitionRequestClient doesn't receive the ErrorResponse and server's FIN, so client will keep socket be establised status and waiting for BufferResponse from server (maybe our machine's kernel-bug lead to ErrorResponse and FIN lost ) 3. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status, and so lead to Half-opened-socket To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine was: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found Half-opend-socket (see [https://en.wikipedia.org/wiki/TCP_half-open] ) which is already closed on server side but established on client side,lead to this: 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient. After write ErrorResponse success(maybe only write to tcp socket buffer success), server will close the channel (socket will be transformed to fin_wait1 status) 2. PartitionRequestClient doesn't receive the ErrorResponse and server's FIN, so client will keep socket be establised status and waiting for BufferResponse from server (maybe our machine's kernel-bug lead to ErrorResponse and FIN lost ) 3. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status, and so lead to Half-opened-socket To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its all donwStreamTask is idle, job > will keep this status until chk is timeout(use aligned chk); After we analyse > this case, we found Half-opend-socket (see > [https://en.wikipedia.org/wiki/TCP_half-open] ) which is already closed on > server side but established on client side,lead to this: > 1. NettyServer encounter ReadTimeoutException when read data from > channel, then it will release the NetworkSequenceViewReader (which is > responsable to send data to
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Description: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found Half-opend-socket (see [https://en.wikipedia.org/wiki/TCP_half-open] ) which is already closed on server side but established on client side,lead to this: 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient. After write ErrorResponse success(maybe only write to tcp socket buffer success), server will close the channel (socket will be transformed to fin_wait1 status) 2. PartitionRequestClient doesn't receive the ErrorResponse and server's FIN, so client will keep socket be establised status and waiting for BufferResponse from server (maybe our machine's kernel-bug lead to ErrorResponse and FIN lost ) 3. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status, and so lead to Half-opened-socket To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine was: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found [ Half-opend-socket| [https://en.wikipedia.org/wiki/TCP_half-open |https://en.wikipedia.org/wiki/TCP_half-open]] which is already closed on server side, but established on client side 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network congestion or our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its all donwStreamTask is idle, job > will keep this status until chk is timeout(use aligned chk); After we analyse > this case, we found Half-opend-socket (see > [https://en.wikipedia.org/wiki/TCP_half-open] ) which is already closed on > server side but established on client side,lead to this: > 1. NettyServer encounter ReadTimeoutException when read data from > channel, then it will
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Description: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found [ Half-opend-socket| [https://en.wikipedia.org/wiki/TCP_half-open |https://en.wikipedia.org/wiki/TCP_half-open]] which is already closed on server side, but established on client side 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network congestion or our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine was: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found [half-opend socket|[https://en.wikipedia.org/wiki/TCP_half-open]] which is already closed on server side, but established on client side 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network congestion or our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its all donwStreamTask is idle, job > will keep this status until chk is timeout(use aligned chk); After we analyse > this case, we found [ Half-opend-socket| > [https://en.wikipedia.org/wiki/TCP_half-open > |https://en.wikipedia.org/wiki/TCP_half-open]] which is already closed on > server side, but established on client side > 1. NettyServer encounter ReadTimeoutException when
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Description: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found [half-opend socket|[https://en.wikipedia.org/wiki/TCP_half-open]] which is already closed on server side, but established on client side 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network congestion or our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine was: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found[ half-opend socket|[https://en.wikipedia.org/wiki/TCP_half-open],] which is already closed on server side, but established on client side 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network congestion or our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its all donwStreamTask is idle, job > will keep this status until chk is timeout(use aligned chk); After we analyse > this case, we found [half-opend > socket|[https://en.wikipedia.org/wiki/TCP_half-open]] which is already > closed on server side, but established on client side > 1. NettyServer encounter ReadTimeoutException when read data from > channel, then it will release the NetworkSequenceViewReader (which is >
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Description: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found[ half-opend socket|[https://en.wikipedia.org/wiki/TCP_half-open],] which is already closed on server side, but established on client side 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network congestion or our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine was: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network congestion or our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its all donwStreamTask is idle, job > will keep this status until chk is timeout(use aligned chk); After we analyse > this case, we found[ half-opend > socket|[https://en.wikipedia.org/wiki/TCP_half-open],] which is already > closed on server side, but established on client side > 1. NettyServer encounter ReadTimeoutException when read data from > channel, then it will release the NetworkSequenceViewReader (which is > responsable to send data to PartitionRequestClient) and write
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Description: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe network congestion or our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine was: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its all donwStreamTask is idle, job > will keep this status until chk is timeout(use aligned chk); After we analyse > this case, we found the reason: (Machine's kernel we used may have bug that > will lost socket event ) > 1. NettyServer encounter ReadTimeoutException when read data from > channel, then it will release the NetworkSequenceViewReader (which is > responsable to send data to PartitionRequestClient) and write ErrorResponse > to PartitionRequestClient; > 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe > network
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Description: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its all donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine was: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its all donwStreamTask is idle, job > will keep this status until chk is timeout(use aligned chk); After we analyse > this case, we found the reason: (Machine's kernel we used may have bug that > will lost socket event ) > 1. NettyServer encounter ReadTimeoutException when read data from > channel, then it will release the NetworkSequenceViewReader (which is > responsable to send data to PartitionRequestClient) and write ErrorResponse > to PartitionRequestClient; > 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our > machine's kernel-bug lead to this)
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Description: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine was: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its donwStreamTask is idle, job will > keep this status until chk is timeout(use aligned chk); After we analyse this > case, we found the reason: (Machine's kernel we used may have bug that will > lost socket event ) > 1. NettyServer encounter ReadTimeoutException when read data from > channel, then it will release the NetworkSequenceViewReader (which is > responsable to send data to PartitionRequestClient) and write ErrorResponse > to PartitionRequestClient; > 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our > machine's kernel-bug lead to this) > 3.
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Description: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine was: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its donwStreamTask is idle, job will > keep this status until chk is timeout(use aligned chk); After we analyse this > case, we found the reason: (Machine's kernel we used may have bug that will > lost socket event ) 1. NettyServer encounter ReadTimeoutException when > read data from channel, then it will release the NetworkSequenceViewReader > (which is responsable to send data to PartitionRequestClient) and write > ErrorResponse to PartitionRequestClient; > 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our > machine's kernel-bug lead to this) > 3. NettyServer after write
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Description: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine was: In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > > upstreamTask is backpressured but its donwStreamTask is idle, job will keep > this status until chk is timeout(use aligned chk); After we analyse this > case, we found the reason: (Machine's kernel we used may have bug that will > lost socket event ) > 1. NettyServer encounter ReadTimeoutException when read data from > channel, then it will release the NetworkSequenceViewReader (which is > responsable to send data to PartitionRequestClient) and write ErrorResponse > to PartitionRequestClient; > 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our > machine's kernel-bug lead to this) > 3. NettyServer after
[jira] [Created] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
Cai Liuyang created FLINK-26080: --- Summary: PartitionRequest client use Netty's IdleStateHandler to monitor channel's status Key: FLINK-26080 URL: https://issues.apache.org/jira/browse/FLINK-26080 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Cai Liuyang In out production environment, we encounter one abnormal case: upstreamTask is backpressured but its donwStreamTask is idle, job will keep this status until chk is timeout(use aligned chk); After we analyse this case, we found the reason: (Machine's kernel we used may have bug that will lost socket event ) 1. NettyServer encounter ReadTimeoutException when read data from channel, then it will release the NetworkSequenceViewReader (which is responsable to send data to PartitionRequestClient) and write ErrorResponse to PartitionRequestClient; 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our machine's kernel-bug lead to this) 3. NettyServer after write ErrorResponse, it will close the channel (socket will be transformed to fin_wait1 status), but client machine doesn't receive the Server's fin, so it will treat the channel is ok, and will keep waiting for server's BufferReponse (But server is already release correlative NetworkSequenceViewReader) 4. Server machine will release the socket if it keep fin_wait1 status for two long time, but the socket on client machine is also under established status. To avoid this case,I think there are two methods: 1. Client enable TCP keep alive(flink is already enabled): this way should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's default time is 7200 seconds, which is two long). 2. Client use netty‘s IdleStateHandler to detect whether channel is idle(read or write), if channel is idle, client will try to write pingMsg to server to detect whether channel is really ok. For the two methods, i recommend the method-2, because adjustment of machine's tcp-keep-alive time will have an impact on other service running on the same machine -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26080) PartitionRequest client use Netty's IdleStateHandler to monitor channel's status
[ https://issues.apache.org/jira/browse/FLINK-26080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-26080: Affects Version/s: 1.14.3 > PartitionRequest client use Netty's IdleStateHandler to monitor channel's > status > > > Key: FLINK-26080 > URL: https://issues.apache.org/jira/browse/FLINK-26080 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > > In out production environment, we encounter one abnormal case: > upstreamTask is backpressured but its donwStreamTask is idle, job will > keep this status until chk is timeout(use aligned chk); After we analyse this > case, we found the reason: (Machine's kernel we used may have bug that will > lost socket event ) > 1. NettyServer encounter ReadTimeoutException when read data from > channel, then it will release the NetworkSequenceViewReader (which is > responsable to send data to PartitionRequestClient) and write ErrorResponse > to PartitionRequestClient; > 2. PartitionRequestClient doesn't receive the ErrorResponse (maybe our > machine's kernel-bug lead to this) > 3. NettyServer after write ErrorResponse, it will close the channel > (socket will be transformed to fin_wait1 status), but client machine doesn't > receive the Server's fin, so it will treat the channel is ok, and will keep > waiting for server's BufferReponse (But server is already release correlative > NetworkSequenceViewReader) > 4. Server machine will release the socket if it keep fin_wait1 status for > two long time, but the socket on client machine is also under established > status. > To avoid this case,I think there are two methods: > 1. Client enable TCP keep alive(flink is already enabled): this way > should also need adjust machine's tcp-keep-alive time (tcp-keep-alive's > default time is 7200 seconds, which is two long). > 2. Client use netty‘s IdleStateHandler to detect whether channel is > idle(read or write), if channel is idle, client will try to write pingMsg to > server to detect whether channel is really ok. > For the two methods, i recommend the method-2, because adjustment of > machine's tcp-keep-alive time will have an impact on other service running on > the same machine > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true
[ https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-25664: Component/s: Runtime / Network > Notify will be not triggered for PipelinedSubpartition if more than one > buffer is added during isBlocked == true > > > Key: FLINK-25664 > URL: https://issues.apache.org/jira/browse/FLINK-25664 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > Labels: pull-request-available > > For now, there might be case like: > # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked > == false) > # CreditBasedSequenceNumberingViewReader pool this buffer and > PipelinedSubPartition become to Blocked (isBlocked == true) > # Before downStream resumeConsumption, we add two finished-buffer to this > PipelinedSubPartition (there is no limit for adding buffer to > blocked-PipelinedSubPartition) > ## add the first finished-buffer will not notifyDataAvailable because > isBlocked == true > ## add the second finished-buffer will also not notifyDataAvailable because > of isBlocked == true and finishedBuffer > 1 > # DownStream resumeConsumption, PipelinedSubPartition is unblocked > (isBlocked == false) > # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable > because of finishedBuffer > 1 > In conclusion,There are three case we should trigger notifyDataAvailable: > case1: only have one finished buffer (handled by add) > case2: only have one unfinished buffer (handled by flush) > case3: have more than on finished buffer, which is add during > PipelinedSubPartition is blocked (not handled) > {code:java} > // test code for this case > // add this test case to > org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest > > @Test > public void > testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption() > throws Exception { > blockSubpartitionByCheckpoint(1); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > assertEquals(1, availablityListener.getNumNotifications()); > readView.resumeConsumption(); > subpartition.flush(); > assertEquals(2, availablityListener.getNumNotifications()); > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true
[ https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-25664: Affects Version/s: 1.14.3 > Notify will be not triggered for PipelinedSubpartition if more than one > buffer is added during isBlocked == true > > > Key: FLINK-25664 > URL: https://issues.apache.org/jira/browse/FLINK-25664 > Project: Flink > Issue Type: Bug >Affects Versions: 1.14.3 >Reporter: Cai Liuyang >Priority: Major > Labels: pull-request-available > > For now, there might be case like: > # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked > == false) > # CreditBasedSequenceNumberingViewReader pool this buffer and > PipelinedSubPartition become to Blocked (isBlocked == true) > # Before downStream resumeConsumption, we add two finished-buffer to this > PipelinedSubPartition (there is no limit for adding buffer to > blocked-PipelinedSubPartition) > ## add the first finished-buffer will not notifyDataAvailable because > isBlocked == true > ## add the second finished-buffer will also not notifyDataAvailable because > of isBlocked == true and finishedBuffer > 1 > # DownStream resumeConsumption, PipelinedSubPartition is unblocked > (isBlocked == false) > # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable > because of finishedBuffer > 1 > In conclusion,There are three case we should trigger notifyDataAvailable: > case1: only have one finished buffer (handled by add) > case2: only have one unfinished buffer (handled by flush) > case3: have more than on finished buffer, which is add during > PipelinedSubPartition is blocked (not handled) > {code:java} > // test code for this case > // add this test case to > org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest > > @Test > public void > testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption() > throws Exception { > blockSubpartitionByCheckpoint(1); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > assertEquals(1, availablityListener.getNumNotifications()); > readView.resumeConsumption(); > subpartition.flush(); > assertEquals(2, availablityListener.getNumNotifications()); > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true
[ https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-25664: Description: For now, there might be case like: # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == false) # CreditBasedSequenceNumberingViewReader pool this buffer and PipelinedSubPartition become to Blocked (isBlocked == true) # Before downStream resumeConsumption, we add two finished-buffer to this PipelinedSubPartition (there is no limit for adding buffer to blocked-PipelinedSubPartition) ## add the first finished-buffer will not notifyDataAvailable because isBlocked == true ## add the second finished-buffer will also not notifyDataAvailable because of isBlocked == true and finishedBuffer > 1 # DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked == false) # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable because of finishedBuffer > 1 In conclusion,There are three case we should trigger notifyDataAvailable: case1: only have one finished buffer (handled by add) case2: only have one unfinished buffer (handled by flush) case3: have more than on finished buffer, which is add during PipelinedSubPartition is blocked (not handled) {code:java} // test code for this case // add this test case to org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest @Test public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption() throws Exception { blockSubpartitionByCheckpoint(1); subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); assertEquals(1, availablityListener.getNumNotifications()); readView.resumeConsumption(); subpartition.flush(); assertEquals(2, availablityListener.getNumNotifications()); } {code} was: For now, there might be case like: # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == false) # CreditBasedSequenceNumberingViewReader pool this buffer and PipelinedSubPartition become to Blocked (isBlocked == true) # Before downStream resumeConsumption, we add two finished-buffer to this PipelinedSubPartition (there is no limit for adding buffer to blocked-PipelinedSubPartition) ## add the first finished-buffer will not notifyDataAvailable because isBlocked == true ## add the second finished-buffer will also not notifyDataAvailable because of isBlocked == true and finishedBuffer > 1 # DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked == false) # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable because of finishedBuffer > 1 In conclusion,There are three case we should trigger notifyDataAvailable: case1: only have one finished buffer (handled by add) case2: only have one unfinished buffer (handled by flush) case3: have more than on finished buffer, which is add during PipelinedSubPartition is blocked (not handled) {code:java} // test code for this case // add this test case to org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest will @Test public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption() throws Exception { blockSubpartitionByCheckpoint(1); subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); assertEquals(1, availablityListener.getNumNotifications()); readView.resumeConsumption(); subpartition.flush(); assertEquals(2, availablityListener.getNumNotifications()); } {code} > Notify will be not triggered for PipelinedSubpartition if more than one > buffer is added during isBlocked == true > > > Key: FLINK-25664 > URL: https://issues.apache.org/jira/browse/FLINK-25664 > Project: Flink > Issue Type: Bug >Reporter: Cai Liuyang >Priority: Major > > For now, there might be case like: > # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked > == false) > # CreditBasedSequenceNumberingViewReader pool this buffer and > PipelinedSubPartition become to Blocked (isBlocked == true) > # Before downStream resumeConsumption, we add two finished-buffer to this > PipelinedSubPartition (there is no limit for adding buffer to > blocked-PipelinedSubPartition) > ## add the first finished-buffer will not notifyDataAvailable because > isBlocked == true > ## add the second finished-buffer will also not notifyDataAvailable because > of isBlocked == true and finishedBuffer > 1 > # DownStream resumeConsumption, PipelinedSubPartition is unblocked > (isBlocked == false) > #
[jira] [Created] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true
Cai Liuyang created FLINK-25664: --- Summary: Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true Key: FLINK-25664 URL: https://issues.apache.org/jira/browse/FLINK-25664 Project: Flink Issue Type: Bug Reporter: Cai Liuyang For now, there might be case like: # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == false) # CreditBasedSequenceNumberingViewReader pool this buffer and PipelinedSubPartition become to Blocked (isBlocked == true) # Before downStream resumeConsumption, we add two finished-buffer to this PipelinedSubPartition (there is no limit for adding buffer to blocked-PipelinedSubPartition) ## add the first finished-buffer will not notifyDataAvailable because isBlocked == true ## add the second finished-buffer will also not notifyDataAvailable because of isBlocked == true and finishedBuffer > 1 # DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked == false) # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable because of finishedBuffer > 1 In conclusion,There are three case we should trigger notifyDataAvailable: case1: only have one finished buffer (handled by add) case2: only have one unfinished buffer (handled by flush) case3: have more than on finished buffer, which is add during PipelinedSubPartition is blocked (not handled) {code:java} // test code for this case // add this test case to org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest will @Test public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption() throws Exception { blockSubpartitionByCheckpoint(1); subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); assertEquals(1, availablityListener.getNumNotifications()); readView.resumeConsumption(); subpartition.flush(); assertEquals(2, availablityListener.getNumNotifications()); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)