For future reference: This was an issue in Avro type conversion. For the full description and a solution see here <https://lists.apache.org/thread.html/rc4d468169920a81362c23b320752b7073afa4ab1eea3b6bfd6b9c93b%40%3Cuser.avro.apache.org%3E> .
-B On Mon, Oct 5, 2020 at 2:33 PM Bashir Sadjad <[email protected]> wrote: > Hi all, > > I am trying to write Avro GenericRecords into Parquet files > using PrquetIO. I am having an issue in the AvroCoder of Beam which I am > not sure is an issue with Beam or Avro, hence asking here first. A > simplified version of my code looks like this: > > class FetchSearchPageFn extends DoFn<SearchSegmentDescriptor, > GenericRecord> { > @ProcessElement > public void ProcessElement(@Element SearchSegmentDescriptor segment, > OutputReceiver<GenericRecord> out) { > List<GenericRecord> records = *[CONVERT segment TO A LIST OF > GenericRecords]*; > for (GenericRecord record : records) { > *out.output(record); // This is where the error happens.* > } > } > ....... > PCollection<GenericRecord> records = inputSegments > .apply(ParDo.of(new FetchSearchPageFn(*[ARGS]*))) > .setCoder(AvroCoder.of(GenericRecord.class, schema)); > > I have tried to minimize this code and removed parts that I believe are > fine. The error happens in AvroCoder.encode. After some debugging, it > seems the Avro serialization code fails on converting a BigDecimal into > byte array, specifically here > <https://github.com/apache/avro/blob/9a6aa43c8699ccde7bef485206928acb35378bdb/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java#L912> > : > > An exception occured while executing the Java class. > org.apache.avro.AvroRuntimeException: Unknown datum type > java.math.BigDecimal: 170.0 > > And when I checked the Avro schema (it is generated by another library) > the schema for the above BigDecimal field that is failing is: > > { > "name":"value", > "type":[ > "null", > { > "type":"bytes", > "logicalType":"decimal", > "precision":12, > "scale":4 > } > ], > "default":null > }, > > i.e., it is a logical decimal type. I am wondering if I am setting my > pipeline and/or the AvroCoder incorrectly or whether this is an issue in > the Avro code? > > The root cause is: > > Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type > java.math.BigDecimal: 170.0 > at org.apache.avro.generic.GenericData.getSchemaName ( > GenericData.java:912) > at org.apache.avro.generic.GenericData.resolveUnion ( > GenericData.java:874) > at org.apache.avro.generic.GenericDatumWriter.resolveUnion ( > GenericDatumWriter.java:272) > at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion ( > GenericDatumWriter.java:143) > at org.apache.avro.generic.GenericDatumWriter.write ( > GenericDatumWriter.java:83) > at org.apache.avro.generic.GenericDatumWriter.writeField ( > GenericDatumWriter.java:221) > at org.apache.avro.generic.GenericDatumWriter.writeRecord ( > GenericDatumWriter.java:210) > at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion ( > GenericDatumWriter.java:131) > at org.apache.avro.generic.GenericDatumWriter.write ( > GenericDatumWriter.java:83) > at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion ( > GenericDatumWriter.java:145) > at org.apache.avro.generic.GenericDatumWriter.write ( > GenericDatumWriter.java:83) > at org.apache.avro.generic.GenericDatumWriter.writeField ( > GenericDatumWriter.java:221) > at org.apache.avro.generic.GenericDatumWriter.writeRecord ( > GenericDatumWriter.java:210) > at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion ( > GenericDatumWriter.java:131) > at org.apache.avro.generic.GenericDatumWriter.write ( > GenericDatumWriter.java:83) > at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion ( > GenericDatumWriter.java:145) > at org.apache.avro.generic.GenericDatumWriter.write ( > GenericDatumWriter.java:83) > at org.apache.avro.generic.GenericDatumWriter.writeField ( > GenericDatumWriter.java:221) > at org.apache.avro.generic.GenericDatumWriter.writeRecord ( > GenericDatumWriter.java:210) > at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion ( > GenericDatumWriter.java:131) > at org.apache.avro.generic.GenericDatumWriter.write ( > GenericDatumWriter.java:83) > at org.apache.avro.generic.GenericDatumWriter.write ( > GenericDatumWriter.java:73) > at org.apache.beam.sdk.coders.AvroCoder.encode (AvroCoder.java:312) > at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136) > at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream ( > CoderUtils.java:82) > at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray ( > CoderUtils.java:66) > at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray ( > CoderUtils.java:51) > at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> > (MutationDetectors.java:115) > at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder ( > MutationDetectors.java:46) > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add > (ImmutabilityCheckingBundleFactory.java:112) > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output ( > ParDoEvaluator.java:299) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue > (SimpleDoFnRunner.java:258) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$700 > (SimpleDoFnRunner.java:78) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output > (SimpleDoFnRunner.java:627) > at > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output > (DoFnOutputReceivers.java:73) > at org.openmrs.analytics.FhirEtl$FetchSearchPageFn.ProcessElement ( > FhirEtl.java:205) > > Thanks > > -B >
