Hi at all,

i'm using Flink 1.11 with the datastream api. I would like to write my results 
in parquet format into HDFS.

Therefore i generated an Avro SpecificRecord with the avro-maven-plugin:



<plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            
<sourceDirectory>src/main/resources/avro/</sourceDirectory>
                            
<outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>
                            <stringType>String</stringType>
                        </configuration>
                    </execution>
                </executions>
            </plugin>


Then  I'm using the SpecificRecord in the StreamingFileSink:



val sink: StreamingFileSink[SpecificRecord] = StreamingFileSink
  .forBulkFormat(
    new Path("hdfs://example.com:8020/data/"),
    ParquetAvroWriters.forSpecificRecord(classOf[SpecificRecord])
  )
  .build()


The job cancels with the following error:


java.lang.AbstractMethodError: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg/apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/Statistics;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;)V
    at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:53)
    at 
org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:315)
    at 
org.apache.parquet.column.impl.ColumnWriteStoreBase.sizeCheck(ColumnWriteStoreBase.java:200)
    at 
org.apache.parquet.column.impl.ColumnWriteStoreBase.endRecord(ColumnWriteStoreBase.java:187)
    at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.endRecord(ColumnWriteStoreV1.java:27)
    at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:307)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:166)
    at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
    at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:202)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
    at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
    at 
de.integration_factory.datastream.aggregations.CasesProcessFunction.process(CasesProcessFunction.scala:34)
    at 
de.integration_factory.datastream.aggregations.CasesProcessFunction.process(CasesProcessFunction.scala:11)
    at 
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
    at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
    at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
    at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
    at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:154)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:568)
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)



What can I do to fix this?


Best,
Jan

HINWEIS: Dies ist eine vertrauliche Nachricht und nur f?r den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zug?nglich zu machen. Sollten Sie diese Nachricht irrt?mlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.

Reply via email to