Hi all,
I have made a good progress to figure out the root cause to this issue. The
details are in BEAM-11721. I asked some questions at the end of the jira and I
am just duplicating it here for visibility. Thanks so much for the help and
support from the community!
1. It's not quite intuitive to create a avro schema for ParquetIO, which
contains spark defined fields ("list", "element" etc), when we are ingesting
spark created parquet files. Is it possible to support the standard avro
definition for the array type like
(“type":"array","elementType":"integer","containsNull":true”)? Can beam do the
schema translation under the hood to avoid the hassle for the users?
2. Taking a step back, why does ParquetIO require an avro schema
specification, while AvroParquetReader does not actually require the schema? I
briefly looked at the ParquetIO source code but has not figured it out yet.
From: Tao Li <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Friday, January 29, 2021 at 3:37 PM
To: Chamikara Jayalath <[email protected]>, "[email protected]"
<[email protected]>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
Thanks @Chamikara Jayalath<mailto:[email protected]> I created this jira:
https://issues.apache.org/jira/browse/BEAM-11721
From: Chamikara Jayalath <[email protected]>
Date: Friday, January 29, 2021 at 2:47 PM
To: Tao Li <[email protected]>
Cc: "[email protected]" <[email protected]>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
Sounds like a bug. I think JIRA with a test case will still be helpful.
On Fri, Jan 29, 2021 at 2:33 PM Tao Li
<[email protected]<mailto:[email protected]>> wrote:
@Chamikara Jayalath<mailto:[email protected]> Sorry about the confusion. But
I did more testing and using the spark runner actually yields the same error:
java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record
cannot be cast to java.lang.Number
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at
org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)
at
org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:55)
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$groupByKeyAndWindow$c9b6f5c4$1(GroupNonMergingWindowsFunctions.java:86)
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$bringWindowToKey$0(GroupNonMergingWindowsFunctions.java:129)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
From: Chamikara Jayalath <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Friday, January 29, 2021 at 10:53 AM
To: user <[email protected]<mailto:[email protected]>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
Thanks. It might be something good to document in case other users run into
this as well. Can you file a JIRA with the details ?
On Fri, Jan 29, 2021 at 10:47 AM Tao Li
<[email protected]<mailto:[email protected]>> wrote:
OK I think this issue is due to incompatibility between the parquet files
(created with spark 2.4) and parquet version as a dependency of ParquetIO 2.25.
It seems working after I switch to spark runner (from direct runner) and run
the beam app in a spark cluster. I assume by doing this I am basically using
parquet jars from spark distributable directly and now everything is compatible.
From: Tao Li <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Friday, January 29, 2021 at 7:45 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
Hi community,
Can someone take a look at this issue? It is kind of a blocker to me right now.
Really appreciate your help!
From: Tao Li <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Thursday, January 28, 2021 at 6:13 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
BTW I tried avro 1.8 and 1.9 and both have the same error. So we can probably
rule out any avro issue.
From: Tao Li <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Thursday, January 28, 2021 at 9:07 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Potential bug with ParquetIO.read when reading arrays
Hi Beam community,
I am seeing an error when reading an array field using ParquetIO. I was using
beam 2.25 and the direct runner for testing. Is this a bug or a known issue? Am
I missing anything here? Please help me root cause this issue. Thanks so much!
Attached are the avro schema and the parquet file. Below is the schema tree as
a quick visualization. The array field name is “list” and the element type is
int. You can see this schema defined in the avsc file as well.
root
|-- list: array (nullable = true)
| |-- element: integer (containsNull = true)
The beam code is very simple:
pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));
Here is the error when running that code:
[direct-runner-worker] INFO
shaded.org.apache.parquet.hadoop.InternalParquetRecordReader - block read in
memory in 130 ms. row count = 1
Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot
be cast to java.lang.Number
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
Caused by: java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at
org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
at
org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
at
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)