Hi all, Thanks for all the discussions so far (including discussions in BEAM-11721 and offline discussions). We will use BEAM-11650 to track the request of making avro schema optional for ParquetIO.read operation. I can potentially work on that ticket later.
There is another issue that I hope to get some help with from the beam
community. I have posted this question on beam slack channels but I am
duplicating it here for visibility. Basically I am using ParquetIO (which uses
AvroParquetReader) to read spark created parquet files (please see attached).
The inspection result is below. You can see the spark schema is very simple,
which is just a field of an array of integers:
creator: parquet-mr version 1.10.1 (build
815bcfa4a4aacf66d207b3dc692150d16b5740b9)
extra: org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"numbers","type":
{"type":"array","elementType":"integer","containsNull":true}
,"nullable":true,"metadata":{}}]}
file schema: spark_schema
--------------------------------------------------------------------------------
numbers: OPTIONAL F:1
.list: REPEATED F:1
..element: OPTIONAL INT32 R:1 D:3
When I use ParquetIO to read this file, the Avro schema for the
PCollection<GenericRecord> becomes:
{
"type": "record",
"name": "spark_schema",
"fields": [
{
"name": "numbers",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "list",
"fields": [
{
"name": "element",
"type": [
"null",
"int"
],
"default": null
}
]
}
}
],
"default": null
}
]
}
You can see that the schema becomes an array of record type (which contains a
"element" field). The reason is probably that internally spark parquet is
defining a “list” record type.
The problem is that this avro schema is not the one I want deal with in the
following beam transforms. Instead I want to retain the original schema defined
in spark which is simply an array of integers. Is there an easy way to retain
the original schema when using ParquetIO to read spark created fields? Did
anyone run into this need? Please advise. Thanks!
From: Tao Li <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Saturday, January 30, 2021 at 11:21 PM
To: "[email protected]" <[email protected]>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
Please let me rephrase my question. It's understandable that it might be a good
practice to specify avro schema when reading parquet files (e.g. to support
schema evolution etc). But sometimes the overhead is more than the benefits.
Given that AvroParquetReader does not require an avro schema, is it possible to
make the avro schema specification optional for ParquetIO.read? Thanks!
From: Tao Li <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Saturday, January 30, 2021 at 1:54 PM
To: "[email protected]" <[email protected]>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
Hi all,
I have made a good progress to figure out the root cause to this issue. The
details are in BEAM-11721. I asked some questions at the end of the jira and I
am just duplicating it here for visibility. Thanks so much for the help and
support from the community!
1. It's not quite intuitive to create a avro schema for ParquetIO, which
contains spark defined fields ("list", "element" etc), when we are ingesting
spark created parquet files. Is it possible to support the standard avro
definition for the array type like
(“type":"array","elementType":"integer","containsNull":true”)? Can beam do the
schema translation under the hood to avoid the hassle for the users?
2. Taking a step back, why does ParquetIO require an avro schema
specification, while AvroParquetReader does not actually require the schema? I
briefly looked at the ParquetIO source code but has not figured it out yet.
From: Tao Li <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Friday, January 29, 2021 at 3:37 PM
To: Chamikara Jayalath <[email protected]>, "[email protected]"
<[email protected]>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
Thanks @Chamikara Jayalath<mailto:[email protected]> I created this jira:
https://issues.apache.org/jira/browse/BEAM-11721
From: Chamikara Jayalath <[email protected]>
Date: Friday, January 29, 2021 at 2:47 PM
To: Tao Li <[email protected]>
Cc: "[email protected]" <[email protected]>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
Sounds like a bug. I think JIRA with a test case will still be helpful.
On Fri, Jan 29, 2021 at 2:33 PM Tao Li
<[email protected]<mailto:[email protected]>> wrote:
@Chamikara Jayalath<mailto:[email protected]> Sorry about the confusion. But
I did more testing and using the spark runner actually yields the same error:
java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record
cannot be cast to java.lang.Number
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at
org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)
at
org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:55)
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$groupByKeyAndWindow$c9b6f5c4$1(GroupNonMergingWindowsFunctions.java:86)
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$bringWindowToKey$0(GroupNonMergingWindowsFunctions.java:129)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
From: Chamikara Jayalath <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Friday, January 29, 2021 at 10:53 AM
To: user <[email protected]<mailto:[email protected]>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
Thanks. It might be something good to document in case other users run into
this as well. Can you file a JIRA with the details ?
On Fri, Jan 29, 2021 at 10:47 AM Tao Li
<[email protected]<mailto:[email protected]>> wrote:
OK I think this issue is due to incompatibility between the parquet files
(created with spark 2.4) and parquet version as a dependency of ParquetIO 2.25.
It seems working after I switch to spark runner (from direct runner) and run
the beam app in a spark cluster. I assume by doing this I am basically using
parquet jars from spark distributable directly and now everything is compatible.
From: Tao Li <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Friday, January 29, 2021 at 7:45 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
Hi community,
Can someone take a look at this issue? It is kind of a blocker to me right now.
Really appreciate your help!
From: Tao Li <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Thursday, January 28, 2021 at 6:13 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays
BTW I tried avro 1.8 and 1.9 and both have the same error. So we can probably
rule out any avro issue.
From: Tao Li <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Thursday, January 28, 2021 at 9:07 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Potential bug with ParquetIO.read when reading arrays
Hi Beam community,
I am seeing an error when reading an array field using ParquetIO. I was using
beam 2.25 and the direct runner for testing. Is this a bug or a known issue? Am
I missing anything here? Please help me root cause this issue. Thanks so much!
Attached are the avro schema and the parquet file. Below is the schema tree as
a quick visualization. The array field name is “list” and the element type is
int. You can see this schema defined in the avsc file as well.
root
|-- list: array (nullable = true)
| |-- element: integer (containsNull = true)
The beam code is very simple:
pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));
Here is the error when running that code:
[direct-runner-worker] INFO
shaded.org.apache.parquet.hadoop.InternalParquetRecordReader - block read in
memory in 130 ms. row count = 1
Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot
be cast to java.lang.Number
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
Caused by: java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at
org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
at
org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
at
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)
from-spark.snappy.parquet
Description: from-spark.snappy.parquet
