I have a java app that is using a flink SQL query to perform aggregations
on a data stream being read in from Kafka. Attached is the java file for
reference.

The query results are being written to s3. I can write successfully in Json
format but when I try to use Parquet format, flink complains that min_ts is
an optional group. I have verified that min_ts can never be null in our
scheme of things.

Would appreciate help on this. Thanks!

Stack trace:

Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot
write a schema with an empty group: optional group min_ts {

}

    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)

    at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)

    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)

    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)

    at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)

    at 
org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)

    at 
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:233)

    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)

    at 
org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)

    at 
com.dlvr.pipeline.falcon.sink.ParquetWriterSink.createAvroParquetWriter(ParquetWriterSink.java:37)

    at 
com.dlvr.pipeline.falcon.sink.ParquetWriterSink.lambda$forReflectRecord$3c375096$1(ParquetWriterSink.java:48)

    at 
org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)

    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)

    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)

    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)

    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)

    at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)

    ... 50 more

Attachment: StatsPipeline.java
Description: Binary data

Reply via email to