Getting apparently non-fatal ClassCastExceptions for an Enum, but only when it 
is run on the DataflowRunner. Running on DirectRunner it works just fine. Full 
Stacktrace (Provided at the end as it is ... lengthy) references none of my 
code, other than the class name of the Enum (Usually my first stop is looking 
for problems with my code, because ... you know... I wrote it.)

Overview of pipeline:
Streaming pipeline.  Pubsub -> Wrapper class for Protobuf generated class -> 
some transforms into another protobuf class wrapper -> ParquetIO to GCS with 
dynamic destinations.

I am creating the Avro schema by using the JavaBeans schema and translating it 
to an Avro schema:

    beamSchema =
        JavaBeanUtils.schemaFromJavaBeanClass(
            EvaluatedIngestSourceWrapper.class, 
JavaBeanSchema.GetterTypeSupplier.INSTANCE);

    avroSchema = AvroUtils.toAvroSchema(beamSchema);

Then using that schema to create a GenericRecordBuilder and building the 
GenericRecord and returning it to be serialized.

ParquetIO step, `flattened` is a PCollection<EvaluatedIngestSourceWrapper>

    flattened.apply(
        "Write to GCS",
        FileIO.<FileNamingData, EvaluatedIngestSourceWrapper>writeDynamic()
            .by(
                eisw -> {
                  assert eisw != null;
                  // Must provide this as a Static Value Provider created 
outside of the lambda as
                  // otherwise it cannot be serialized
                  return new FileNamingData(eisw, windowSize);
                })
            .via(
                Contextful.fn(EvaluatedIngestSourceWrapper::asGenericRecord),
                ParquetIO.sink(avroSchema))
            .to("gs://" + options.getFilenamePrefix())
            .withNaming(FileNamingData::fileNamingForRow)
            
.withDestinationCoder(pipeline.getSchemaRegistry().getSchemaCoder(FileNamingData.class))
            .withNumShards(options.getNumShards()));

This works perfectly running under the DirectRunner, but when I deploy it to 
Dataflow it racks up errors, but not for every record processed. Verified via 
debugger that the Avro schema ends up with this definition of the enum field:

{
    "name" : "evalDupIdentifiersState",
    "type" : [ "null", {
      "type" : "enum",
      "name" : "evalDupIdentifiersState",
      "doc" : "",
      "symbols" : [ "potentialDuplicateIdentifier", "newIdentifier", 
"duplicateIdentifier", "UNRECOGNIZED" ]
    }

Full Stack trace follows:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: 
java.lang.ClassCastException: class 
ev24.protobuf.epIngest.v1.DupIdentifiersState cannot be cast to class 
java.lang.Number (ev24.protobuf.epIngest.v1.DupIdentifiersState is in unnamed 
module of loader 'app'; java.lang.Number is in module java.base of loader 
'bootstrap')

        at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output 
(GroupAlsoByWindowsParDoFn.java:187)
        at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue
 (GroupAlsoByWindowFnRunner.java:108)
        at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1
 (ReduceFnRunner.java:1060)
        at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output
 (ReduceFnContextFactory.java:445)
        at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger
 (SystemReduceFn.java:130)
        at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger
 (ReduceFnRunner.java:1063)
        at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit
 (ReduceFnRunner.java:934)
        at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers
 (ReduceFnRunner.java:795)
        at 
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
 (StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
        at 
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
 (StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
        at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement
 (GroupAlsoByWindowFnRunner.java:121)
        at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement
 (GroupAlsoByWindowFnRunner.java:73)
        at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
 (LateDataDroppingDoFnRunner.java:81)
        at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement
 (GroupAlsoByWindowsParDoFn.java:137)
        at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process
 (ParDoOperation.java:44)
        at 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process
 (OutputReceiver.java:49)
        at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop
 (ReadOperation.java:212)
        at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start 
(ReadOperation.java:163)
        at 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute
 (MapTaskExecutor.java:92)
        at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process 
(StreamingDataflowWorker.java:1430)
        at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100 
(StreamingDataflowWorker.java:165)
        at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run 
(StreamingDataflowWorker.java:1109)
        at java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:628)
        at java.lang.Thread.run (Thread.java:834)

Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.lang.ClassCastException: class 
ev24.protobuf.epIngest.v1.DupIdentifiersState cannot be cast to class 
java.lang.Number (ev24.protobuf.epIngest.v1.DupIdentifiersState is in unnamed 
module of loader 'app'; java.lang.Number is in module java.base of loader 
'bootstrap')

        at org.apache.beam.sdk.util.UserCodeException.wrap 
(UserCodeException.java:39)
        at 
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement
 (Unknown Source)
        at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
 (SimpleDoFnRunner.java:232)
        at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement
 (SimpleDoFnRunner.java:188)
        at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement 
(SimpleParDoFn.java:339)
        at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process
 (ParDoOperation.java:44)
        at 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process
 (OutputReceiver.java:49)
        at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output 
(GroupAlsoByWindowsParDoFn.java:185)

Caused by: java.lang.ClassCastException: class 
ev24.protobuf.epIngest.v1.DupIdentifiersState cannot be cast to class 
java.lang.Number (ev24.protobuf.epIngest.v1.DupIdentifiersState is in unnamed 
module of loader 'app'; java.lang.Number is in module java.base of loader 
'bootstrap')

        at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion 
(AvroWriteSupport.java:329)
        at org.apache.parquet.avro.AvroWriteSupport.writeValue 
(AvroWriteSupport.java:284)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields 
(AvroWriteSupport.java:197)
        at org.apache.parquet.avro.AvroWriteSupport.write 
(AvroWriteSupport.java:171)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write 
(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetWriter.write 
(ParquetWriter.java:301)
        at org.apache.beam.sdk.io.parquet.ParquetIO$Sink.write 
(ParquetIO.java:1063)
        at org.apache.beam.sdk.io.parquet.ParquetIO$Sink.write 
(ParquetIO.java:1008)
        at org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.write 
(FileIO.java:1382)
        at org.apache.beam.sdk.io.WriteFiles.writeOrClose (WriteFiles.java:584)
        at org.apache.beam.sdk.io.WriteFiles.access$1000 (WriteFiles.java:116)
        at 
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement 
(WriteFiles.java:775)


[https://storage.googleapis.com/e24-email-images/e24logonotag.png]<https://www.evolve24.com>
 Andrew Kettmann
DevOps Engineer
P: 1.314.596.2836
[LinkedIn]<https://linkedin.com/company/evolve24> [Twitter] 
<https://twitter.com/evolve24>  [Instagram] 
<https://www.instagram.com/evolve_24>

evolve24 Confidential & Proprietary Statement: This email and any attachments 
are confidential and may contain information that is privileged, confidential 
or exempt from disclosure under applicable law. It is intended for the use of 
the recipients. If you are not the intended recipient, or believe that you have 
received this communication in error, please do not read, print, copy, 
retransmit, disseminate, or otherwise use the information. Please delete this 
email and attachments, without reading, printing, copying, forwarding or saving 
them, and notify the Sender immediately by reply email. No confidentiality or 
privilege is waived or lost by any transmission in error.

Reply via email to