Thanks. It might be something good to document in case other users run into
this as well. Can you file a JIRA with the details ?


On Fri, Jan 29, 2021 at 10:47 AM Tao Li <[email protected]> wrote:

> OK I think this issue is due to incompatibility between the parquet files
> (created with spark 2.4) and parquet version as a dependency of ParquetIO
> 2.25. It seems working after I switch to spark runner (from direct runner)
> and run the beam app in a spark cluster. I assume by doing this I am
> basically using parquet jars from spark distributable directly and now
> everything is compatible.
>
>
>
> *From: *Tao Li <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Friday, January 29, 2021 at 7:45 AM
> *To: *"[email protected]" <[email protected]>
> *Subject: *Re: Potential bug with ParquetIO.read when reading arrays
>
>
>
> Hi community,
>
>
>
> Can someone take a look at this issue? It is kind of a blocker to me right
> now. Really appreciate your help!
>
>
>
> *From: *Tao Li <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Thursday, January 28, 2021 at 6:13 PM
> *To: *"[email protected]" <[email protected]>
> *Subject: *Re: Potential bug with ParquetIO.read when reading arrays
>
>
>
> BTW I tried avro 1.8 and 1.9 and both have the same error. So we can
> probably rule out any avro issue.
>
>
>
> *From: *Tao Li <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Thursday, January 28, 2021 at 9:07 AM
> *To: *"[email protected]" <[email protected]>
> *Subject: *Potential bug with ParquetIO.read when reading arrays
>
>
>
> Hi Beam community,
>
>
>
> I am seeing an error when reading an array field using ParquetIO. I was
> using beam 2.25 and the direct runner for testing. Is this a bug or a known
> issue? Am I missing anything here? Please help me root cause this issue.
> Thanks so much!
>
>
>
> Attached are the avro schema and the parquet file. Below is the schema
> tree as a quick visualization. The array field name is “list” and the
> element type is int. You can see this schema defined in the avsc file as
> well.
>
>
>
> root
>
> |-- list: array (nullable = true)
>
> |    |-- element: integer (containsNull = true)
>
>
>
> The beam code is very simple:
> pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));
>
>
>
> Here is the error when running that code:
>
>
>
> [direct-runner-worker] INFO
> shaded.org.apache.parquet.hadoop.InternalParquetRecordReader - block read
> in memory in 130 ms. row count = 1
>
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> cannot be cast to java.lang.Number
>
>                 at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
>
>                 at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
>
>                 at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
>
>                 at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>
>                 at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>
>                 at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>
> Caused by: java.lang.ClassCastException:
> org.apache.avro.generic.GenericData$Record cannot be cast to
> java.lang.Number
>
>                 at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
>
>                 at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>
>                 at
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
>
>                 at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
>
>                 at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>
>                 at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
>
>                 at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
>
>                 at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
>
>                 at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>
>                 at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>
>                 at
> org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
>
>                 at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>
>                 at
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
>
>                 at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
>
>                 at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
>
>                 at
> org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>
>                 at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
>
>                 at
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>
>                 at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>
>                 at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
>
>                 at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>
>                 at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>
>                 at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>
>                 at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>
>                 at
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)
>
>
>
>
>

Reply via email to