Hi Fatima,

I am not super familiar with Parquet but your issue seems to be
related to [1], which seems to be expected behaviour on the Parquet
side.
The reason for this behaviour seems to be the format of the parquet
files which store only the leaf fields but not the structure of the
groups, so if a group has no fields, its schema cannot be inferred.
Given this, I do not think that it is a bug but feel free to check
further and let us know if I am wrong.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/PARQUET-278

On Tue, Feb 11, 2020 at 11:20 PM Fatima Omer <fat...@dlvr.com> wrote:
>
> I have a java app that is using a flink SQL query to perform aggregations on 
> a data stream being read in from Kafka. Attached is the java file for 
> reference.
>
> The query results are being written to s3. I can write successfully in Json 
> format but when I try to use Parquet format, flink complains that min_ts is 
> an optional group. I have verified that min_ts can never be null in our 
> scheme of things.
>
> Would appreciate help on this. Thanks!
>
> Stack trace:
>
> Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a 
> schema with an empty group: optional group min_ts {
>
> }
>
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
>
>     at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
>
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
>
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
>
>     at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
>
>     at 
> org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
>
>     at 
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:233)
>
>     at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>
>     at 
> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)
>
>     at 
> com.dlvr.pipeline.falcon.sink.ParquetWriterSink.createAvroParquetWriter(ParquetWriterSink.java:37)
>
>     at 
> com.dlvr.pipeline.falcon.sink.ParquetWriterSink.lambda$forReflectRecord$3c375096$1(ParquetWriterSink.java:48)
>
>     at 
> org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
>
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
>
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)
>
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
>
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
>
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
>
>     at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
>     ... 50 more
>
>
>

Reply via email to