Hi Jan,

it looks to me that you might have different parquet-avro dependencies on
your class path. Could you make sure that you don't have different versions
of the library on your classpath?

Cheers,
Till

On Tue, Feb 2, 2021 at 3:29 PM Jan Oelschlegel <
oelschle...@integration-factory.de> wrote:

> Hi at all,
>
>
>
> i’m using Flink 1.11 with the datastream api. I would like to write my
> results in parquet format into HDFS.
>
>
>
> Therefore i generated an Avro SpecificRecord with the avro-maven-plugin:
>
>
> <plugin>
>                 <groupId>org.apache.avro</groupId>
>                 <artifactId>avro-maven-plugin</artifactId>
>                 <version>1.8.2</version>
>                 <executions>
>                     <execution>
>                         <phase>generate-sources</phase>
>                         <goals>
>                             <goal>schema</goal>
>                         </goals>
>                         <configuration>
>                             
> <sourceDirectory>src/main/resources/avro/</sourceDirectory>
>                             
> <outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>
>                             <stringType>String</stringType>
>                         </configuration>
>                     </execution>
>                 </executions>
>             </plugin>
>
>
>
>
>
> Then  I’m using the SpecificRecord in the StreamingFileSink:
>
>
> val sink: StreamingFileSink[SpecificRecord] = StreamingFileSink
>   .*forBulkFormat*(
>     new Path("hdfs://example.com:8020/data/"),
>     ParquetAvroWriters.*forSpecificRecord*(*classOf*[SpecificRecord])
>   )
>   .build()
>
>
>
>
>
> The job cancels with the following error:
>
>
>
>
>
> java.lang.AbstractMethodError: org.apache.parquet.hadoop.
> ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg
> /apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/
> Statistics;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/
> Encoding;Lorg/apache/parquet/column/Encoding;)V
>
>     at org.apache.parquet.column.impl.ColumnWriterV1.writePage(
> ColumnWriterV1.java:53)
>
>     at org.apache.parquet.column.impl.ColumnWriterBase.writePage(
> ColumnWriterBase.java:315)
>
>     at org.apache.parquet.column.impl.ColumnWriteStoreBase.sizeCheck(
> ColumnWriteStoreBase.java:200)
>
>     at org.apache.parquet.column.impl.ColumnWriteStoreBase.endRecord(
> ColumnWriteStoreBase.java:187)
>
>     at org.apache.parquet.column.impl.ColumnWriteStoreV1.endRecord(
> ColumnWriteStoreV1.java:27)
>
>     at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer
> .endMessage(MessageColumnIO.java:307)
>
>     at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport
> .java:166)
>
>     at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(
> InternalParquetRecordWriter.java:128)
>
>     at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:
> 299)
>
>     at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(
> ParquetBulkWriter.java:52)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> BulkPartWriter.write(BulkPartWriter.java:48)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .write(Bucket.java:202)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
> .onElement(Buckets.java:282)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink.invoke(StreamingFileSink.java:420)
>
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(
> StreamSink.java:56)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:52)
>
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:30)
>
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(
> StreamMap.java:41)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:52)
>
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:30)
>
>     at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:53)
>
>     at de.integration_factory.datastream.aggregations.CasesProcessFunction
> .process(CasesProcessFunction.scala:34)
>
>     at de.integration_factory.datastream.aggregations.CasesProcessFunction
> .process(CasesProcessFunction.scala:11)
>
>     at org.apache.flink.streaming.api.scala.function.util.
> ScalaProcessWindowFunctionWrapper.process(
> ScalaProcessWindowFunctionWrapper.scala:63)
>
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:50)
>
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:32)
>
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.emitWindowContents(WindowOperator.java:549)
>
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.java:457)
>
>     at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
> .advanceWatermark(InternalTimerServiceImpl.java:276)
>
>     at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
> .advanceWatermark(InternalTimeServiceManager.java:154)
>
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .processWatermark(AbstractStreamOperator.java:568)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(
> OneInputStreamTask.java:167)
>
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:179)
>
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
>
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:180)
>
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:153)
>
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:351)
>
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxStep(MailboxProcessor.java:191)
>
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:181)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:566)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:536)
>
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
>     at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
>
>
> What can I do to fix this?
>
>
>
>
>
> Best,
>
> Jan
>
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>

Reply via email to