Hi Jan, it looks to me that you might have different parquet-avro dependencies on your class path. Could you make sure that you don't have different versions of the library on your classpath?
Cheers, Till On Tue, Feb 2, 2021 at 3:29 PM Jan Oelschlegel < oelschle...@integration-factory.de> wrote: > Hi at all, > > > > i’m using Flink 1.11 with the datastream api. I would like to write my > results in parquet format into HDFS. > > > > Therefore i generated an Avro SpecificRecord with the avro-maven-plugin: > > > <plugin> > <groupId>org.apache.avro</groupId> > <artifactId>avro-maven-plugin</artifactId> > <version>1.8.2</version> > <executions> > <execution> > <phase>generate-sources</phase> > <goals> > <goal>schema</goal> > </goals> > <configuration> > > <sourceDirectory>src/main/resources/avro/</sourceDirectory> > > <outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory> > <stringType>String</stringType> > </configuration> > </execution> > </executions> > </plugin> > > > > > > Then I’m using the SpecificRecord in the StreamingFileSink: > > > val sink: StreamingFileSink[SpecificRecord] = StreamingFileSink > .*forBulkFormat*( > new Path("hdfs://example.com:8020/data/"), > ParquetAvroWriters.*forSpecificRecord*(*classOf*[SpecificRecord]) > ) > .build() > > > > > > The job cancels with the following error: > > > > > > java.lang.AbstractMethodError: org.apache.parquet.hadoop. > ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg > /apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/ > Statistics;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/ > Encoding;Lorg/apache/parquet/column/Encoding;)V > > at org.apache.parquet.column.impl.ColumnWriterV1.writePage( > ColumnWriterV1.java:53) > > at org.apache.parquet.column.impl.ColumnWriterBase.writePage( > ColumnWriterBase.java:315) > > at org.apache.parquet.column.impl.ColumnWriteStoreBase.sizeCheck( > ColumnWriteStoreBase.java:200) > > at org.apache.parquet.column.impl.ColumnWriteStoreBase.endRecord( > ColumnWriteStoreBase.java:187) > > at org.apache.parquet.column.impl.ColumnWriteStoreV1.endRecord( > ColumnWriteStoreV1.java:27) > > at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer > .endMessage(MessageColumnIO.java:307) > > at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport > .java:166) > > at org.apache.parquet.hadoop.InternalParquetRecordWriter.write( > InternalParquetRecordWriter.java:128) > > at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java: > 299) > > at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement( > ParquetBulkWriter.java:52) > > at org.apache.flink.streaming.api.functions.sink.filesystem. > BulkPartWriter.write(BulkPartWriter.java:48) > > at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket > .write(Bucket.java:202) > > at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets > .onElement(Buckets.java:282) > > at org.apache.flink.streaming.api.functions.sink.filesystem. > StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) > > at org.apache.flink.streaming.api.functions.sink.filesystem. > StreamingFileSink.invoke(StreamingFileSink.java:420) > > at org.apache.flink.streaming.api.operators.StreamSink.processElement( > StreamSink.java:56) > > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > CountingOutput.java:52) > > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > CountingOutput.java:30) > > at org.apache.flink.streaming.api.operators.StreamMap.processElement( > StreamMap.java:41) > > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > CountingOutput.java:52) > > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > CountingOutput.java:30) > > at org.apache.flink.streaming.api.operators.TimestampedCollector. > collect(TimestampedCollector.java:53) > > at de.integration_factory.datastream.aggregations.CasesProcessFunction > .process(CasesProcessFunction.scala:34) > > at de.integration_factory.datastream.aggregations.CasesProcessFunction > .process(CasesProcessFunction.scala:11) > > at org.apache.flink.streaming.api.scala.function.util. > ScalaProcessWindowFunctionWrapper.process( > ScalaProcessWindowFunctionWrapper.scala:63) > > at org.apache.flink.streaming.runtime.operators.windowing.functions. > InternalIterableProcessWindowFunction.process( > InternalIterableProcessWindowFunction.java:50) > > at org.apache.flink.streaming.runtime.operators.windowing.functions. > InternalIterableProcessWindowFunction.process( > InternalIterableProcessWindowFunction.java:32) > > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.emitWindowContents(WindowOperator.java:549) > > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.onEventTime(WindowOperator.java:457) > > at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl > .advanceWatermark(InternalTimerServiceImpl.java:276) > > at org.apache.flink.streaming.api.operators.InternalTimeServiceManager > .advanceWatermark(InternalTimeServiceManager.java:154) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .processWatermark(AbstractStreamOperator.java:568) > > at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark( > OneInputStreamTask.java:167) > > at org.apache.flink.streaming.runtime.streamstatus. > StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels( > StatusWatermarkValve.java:179) > > at org.apache.flink.streaming.runtime.streamstatus. > StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101) > > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .processElement(StreamTaskNetworkInput.java:180) > > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .emitNext(StreamTaskNetworkInput.java:153) > > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:67) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:351) > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxStep(MailboxProcessor.java:191) > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:181) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:566) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:536) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > > at java.lang.Thread.run(Thread.java:748) > > > > > > > > What can I do to fix this? > > > > > > Best, > > Jan > > > HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten > bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten > zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten > haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben > angegebenen Telefonnummer. >