For future reference: This was an issue in Avro type conversion. For the
full description and a solution see here
<https://lists.apache.org/thread.html/rc4d468169920a81362c23b320752b7073afa4ab1eea3b6bfd6b9c93b%40%3Cuser.avro.apache.org%3E>
.

-B

On Mon, Oct 5, 2020 at 2:33 PM Bashir Sadjad <[email protected]> wrote:

> Hi all,
>
> I am trying to write Avro GenericRecords into Parquet files
> using PrquetIO. I am having an issue in the AvroCoder of Beam which I am
> not sure is an issue with Beam or Avro, hence asking here first. A
> simplified version of my code looks like this:
>
> class FetchSearchPageFn extends DoFn<SearchSegmentDescriptor,
> GenericRecord> {
>   @ProcessElement
>   public void ProcessElement(@Element SearchSegmentDescriptor segment,
> OutputReceiver<GenericRecord> out) {
>   List<GenericRecord> records = *[CONVERT segment TO A LIST OF
> GenericRecords]*;
>   for (GenericRecord record : records) {
>     *out.output(record);  // This is where the error happens.*
>   }
> }
> .......
> PCollection<GenericRecord> records = inputSegments
>        .apply(ParDo.of(new FetchSearchPageFn(*[ARGS]*)))
>        .setCoder(AvroCoder.of(GenericRecord.class, schema));
>
> I have tried to minimize this code and removed parts that I believe are
> fine. The error happens in AvroCoder.encode. After some debugging, it
> seems the Avro serialization code fails on converting a BigDecimal into
> byte array, specifically here
> <https://github.com/apache/avro/blob/9a6aa43c8699ccde7bef485206928acb35378bdb/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java#L912>
> :
>
> An exception occured while executing the Java class.
> org.apache.avro.AvroRuntimeException: Unknown datum type
> java.math.BigDecimal: 170.0
>
> And when I checked the Avro schema (it is generated by another library)
> the schema for the above BigDecimal field that is failing is:
>
>  {
>   "name":"value",
>   "type":[
>     "null",
>     {
>       "type":"bytes",
>       "logicalType":"decimal",
>       "precision":12,
>       "scale":4
>     }
>   ],
>   "default":null
> },
>
> i.e., it is a logical decimal type. I am wondering if I am setting my
> pipeline and/or the AvroCoder incorrectly or whether this is an issue in
> the Avro code?
>
> The root cause is:
>
> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
> java.math.BigDecimal: 170.0
>    at org.apache.avro.generic.GenericData.getSchemaName (
> GenericData.java:912)
>    at org.apache.avro.generic.GenericData.resolveUnion (
> GenericData.java:874)
>    at org.apache.avro.generic.GenericDatumWriter.resolveUnion (
> GenericDatumWriter.java:272)
>    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion (
> GenericDatumWriter.java:143)
>    at org.apache.avro.generic.GenericDatumWriter.write (
> GenericDatumWriter.java:83)
>    at org.apache.avro.generic.GenericDatumWriter.writeField (
> GenericDatumWriter.java:221)
>    at org.apache.avro.generic.GenericDatumWriter.writeRecord (
> GenericDatumWriter.java:210)
>    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion (
> GenericDatumWriter.java:131)
>    at org.apache.avro.generic.GenericDatumWriter.write (
> GenericDatumWriter.java:83)
>    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion (
> GenericDatumWriter.java:145)
>    at org.apache.avro.generic.GenericDatumWriter.write (
> GenericDatumWriter.java:83)
>    at org.apache.avro.generic.GenericDatumWriter.writeField (
> GenericDatumWriter.java:221)
>    at org.apache.avro.generic.GenericDatumWriter.writeRecord (
> GenericDatumWriter.java:210)
>    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion (
> GenericDatumWriter.java:131)
>    at org.apache.avro.generic.GenericDatumWriter.write (
> GenericDatumWriter.java:83)
>    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion (
> GenericDatumWriter.java:145)
>    at org.apache.avro.generic.GenericDatumWriter.write (
> GenericDatumWriter.java:83)
>    at org.apache.avro.generic.GenericDatumWriter.writeField (
> GenericDatumWriter.java:221)
>    at org.apache.avro.generic.GenericDatumWriter.writeRecord (
> GenericDatumWriter.java:210)
>    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion (
> GenericDatumWriter.java:131)
>    at org.apache.avro.generic.GenericDatumWriter.write (
> GenericDatumWriter.java:83)
>    at org.apache.avro.generic.GenericDatumWriter.write (
> GenericDatumWriter.java:73)
>    at org.apache.beam.sdk.coders.AvroCoder.encode (AvroCoder.java:312)
>    at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>    at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream (
> CoderUtils.java:82)
>    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (
> CoderUtils.java:66)
>    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (
> CoderUtils.java:51)
>    at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141)
>    at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
> (MutationDetectors.java:115)
>    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (
> MutationDetectors.java:46)
>    at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
> (ImmutabilityCheckingBundleFactory.java:112)
>    at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (
> ParDoEvaluator.java:299)
>    at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
> (SimpleDoFnRunner.java:258)
>    at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$700
> (SimpleDoFnRunner.java:78)
>    at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
> (SimpleDoFnRunner.java:627)
>    at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output
> (DoFnOutputReceivers.java:73)
>    at org.openmrs.analytics.FhirEtl$FetchSearchPageFn.ProcessElement (
> FhirEtl.java:205)
>
> Thanks
>
> -B
>

Reply via email to