Hi Michael,

Currently, ParquetColumnarRowInputFormat does not support schemas with
nested columns. If your parquet file stores Avro records. You might want to
try e.g. Avro Generic record[1].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record

Best regards,
Jing


On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user <
user@flink.apache.org> wrote:

> Hi, folks
>
>
>
> I’m using flink 1.16.0, and I would like to read Parquet file (attached),
> that has schema [1].
>
>
>
> I could read this file with Spark, but when I try to read it with Flink
> 1.16.0 (program attached) using schema [2]
>
> I got IndexOutOfBoundsException [3]
>
>
>
> My code, and parquet file are attached. Is it:
>
> ·         the problem, described in FLINK-28867
> <https://issues.apache.org/jira/browse/FLINK-28867> or
>
> ·         something new, that deserve a separate Jira, or
>
> ·         something wrong with my code?
>
>
>
> [1]: Parquet Schema
>
>
>
> root
>
> |-- amount: decimal(38,9) (nullable = true)
>
> |-- connectionAccountId: string (nullable = true)
>
> |-- sourceEntity: struct (nullable = true)
>
> |    |-- extendedProperties: array (nullable = true)
>
> |    |    |-- element: struct (containsNull = true)
>
> |    |    |    |-- key: string (nullable = true)
>
> |    |    |    |-- value: string (nullable = true)
>
> |    |-- sourceAccountId: string (nullable = true)
>
> |    |-- sourceEntityId: string (nullable = true)
>
> |    |-- sourceEntityType: string (nullable = true)
>
> |    |-- sourceSystem: string (nullable = true)
>
>
>
>
>
> [2]: Schema used in Flink:
>
>
>
>     static RowType getSchema()
>
>     {
>
>         RowType elementType = RowType.of(
>
>             new LogicalType[] {
>
>                 new VarCharType(VarCharType.MAX_LENGTH),
>
>                 new VarCharType(VarCharType.MAX_LENGTH)
>
>             },
>
>             new String[] {
>
>                 "key",
>
>                 "value"
>
>             }
>
>         );
>
>
>
>         RowType element = RowType.of(
>
>             new LogicalType[] { elementType },
>
>             new String[] { "element" }
>
>         );
>
>
>
>         RowType sourceEntity = RowType.of(
>
>             new LogicalType[] {
>
>                 new ArrayType(element),
>
>                 new VarCharType(),
>
>                 new VarCharType(),
>
>                 new VarCharType(),
>
>                 new VarCharType(),
>
>             },
>
>             new String[] {
>
>                 "extendedProperties",
>
>                 "sourceAccountId",
>
>                 "sourceEntityId",
>
>                 "sourceEntityType",
>
>                 "sourceSystem"
>
>             }
>
>         );
>
>
>
>         return  RowType.of(
>
>             new LogicalType[] {
>
>                 new DecimalType(),
>
>                 new VarCharType(),
>
>                 sourceEntity
>
>             },
>
>             new String[] {
>
>                 "amount",
>
>                 "connectionAccountId",
>
>                 "sourceEntity",
>
>         });
>
>     }
>
>
>
> [3]:  Execution Exception:
>
>
> 2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager -
> Received uncaught exception.
>
> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
> exception while polling the records
>
>     at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>
>     ...
>
> Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for
> length 1
>
>     at
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>
>     at
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>
>     at
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>
>     at java.base/java.util.Objects.checkIndex(Objects.java:372)
>
>     at java.base/java.util.ArrayList.get(ArrayList.java:459)
>
>     at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
>
>     at
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
>
>     at
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
>
>     at
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
>
>     at
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
>
>     at
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
>
>     at
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
>
>     at
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260)
>
>     at
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143)
>
>     at
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77)
>
>     at
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
>
>     at
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
>
>     at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>
>     at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>
>     ... 6 common frames omitted
>
>
>
> Thanks
>
>
>

Reply via email to