退订

2023-09-25 文章 chenyu_opensource
退订

Re: Default Flink S3 FileSource timeout due to large file listing

2023-09-25 文章 王国成
退订

 Replied Message 
| From | Eleanore Jin |
| Date | 09/26/2023 01:50 |
| To | user-zh  |
| Subject | Default Flink S3 FileSource timeout due to large file listing |
Hello Flink Community,
Flink Version: 1.16.1, Zookeeper for HA.
My Flink Applications reads raw parquet files hosted in S3, applies
transformations and re-writes them to S3, under a different location.
Below is my code to read from parquets from S3:
```
   final Configuration configuration = new Configuration();
   configuration.set("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
   final ParquetColumnarRowInputFormat format =
 new ParquetColumnarRowInputFormat<>(
   configuration,
   ,
   InternalTypeInfo.of(),
   100,
   true,
   true
 );
   final FileSource source = FileSource
 .forBulkFileFormat(format, new Path("s3/"))
 .build();
stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"parquet-source");
```
I noticed the following:
1. My S3 directory, "s3//", can have more than 1M+ files. The
parquets in this directory are partitioned by date and time. This makes the
folder structure of this directory deterministic. e.g
"s3//partiton_column_a/partition_columb_b/2023-09-25--13/{1,2...N}.parquet".
I believe the Flink Default FileSource is doing a list on this large
directory and gets stuck waiting for the operation to complete. The Akka
connect timeout error messages in the Task Manager logs support this.
Additionally, the job runs successfully when I restrict the input to a
subfolder, looking at only an hour's data, based on the mentioned
partitioning scheme. In my local machine, I also tried using S3 CLI to
recursively list this directory and the operation did not complete in 1
hour.

*Is this behavior expected based on Flink's S3 source implementation? *Looking
at the docs
,
one way to solve this is to implement the Split Enumerator by incrementally
processing the subfolders in "s3//", based on the mentioned
partitioning scheme.

*Are there any other approaches available?*
2. Following the code above, when I deserialize records from S3 I get
records of type BinaryRowData
.
However, when I use the same code in Unit Testing, with
MiniClusterWithClientResource
,
to read from a local parquet file (not S3), I get records of type
GenericRowData

.

*What is the reason for this discrepancy and is it possible to force
deserialization to output type GenericRowData? *Currently, I have written
code to convert BinaryRowData to GenericRowData as our downstream
ProcessFunctions expect this type.
I*s there a better solution to transform BinaryRowData to GenericRowData?*

Thanks!
Eleanore


Default Flink S3 FileSource timeout due to large file listing

2023-09-25 文章 Eleanore Jin
Hello Flink Community,
Flink Version: 1.16.1, Zookeeper for HA.
My Flink Applications reads raw parquet files hosted in S3, applies
transformations and re-writes them to S3, under a different location.
Below is my code to read from parquets from S3:
```
final Configuration configuration = new Configuration();
configuration.set("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
final ParquetColumnarRowInputFormat format =
  new ParquetColumnarRowInputFormat<>(
configuration,
,
InternalTypeInfo.of(),
100,
true,
true
  );
final FileSource source = FileSource
  .forBulkFileFormat(format, new Path("s3/"))
  .build();
 stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"parquet-source");
```
I noticed the following:
1. My S3 directory, "s3//", can have more than 1M+ files. The
parquets in this directory are partitioned by date and time. This makes the
folder structure of this directory deterministic. e.g
"s3//partiton_column_a/partition_columb_b/2023-09-25--13/{1,2...N}.parquet".
I believe the Flink Default FileSource is doing a list on this large
directory and gets stuck waiting for the operation to complete. The Akka
connect timeout error messages in the Task Manager logs support this.
Additionally, the job runs successfully when I restrict the input to a
subfolder, looking at only an hour's data, based on the mentioned
partitioning scheme. In my local machine, I also tried using S3 CLI to
recursively list this directory and the operation did not complete in 1
hour.

*Is this behavior expected based on Flink's S3 source implementation? *Looking
at the docs
,
one way to solve this is to implement the Split Enumerator by incrementally
processing the subfolders in "s3//", based on the mentioned
partitioning scheme.

*Are there any other approaches available?*
2. Following the code above, when I deserialize records from S3 I get
records of type BinaryRowData
.
However, when I use the same code in Unit Testing, with
MiniClusterWithClientResource
,
to read from a local parquet file (not S3), I get records of type
GenericRowData

.

*What is the reason for this discrepancy and is it possible to force
deserialization to output type GenericRowData? *Currently, I have written
code to convert BinaryRowData to GenericRowData as our downstream
ProcessFunctions expect this type.
I*s there a better solution to transform BinaryRowData to GenericRowData?*

Thanks!
Eleanore