Thanks for the swift response Brian, Andrew. I've tried your suggestion Brian, and sadly I get the same error as the lengthy call stack from the end of my original post (IllegalStateException) - it appears the PCollection might have been finalised my the DoFn, and therefore I cannot setRowSchema against it? In the fully implemented version I captured in my original post you can see I call withSchema when creating the Row objects, though interestingly the cutdown version I also posted gives the same error, even though it's passing the input row to the output without mutating it?
Regarding the NULL values from Beam SQL aggregations, I've re-run my pipeline with my NullValueHandler commented out, and unfortunately I can still see min and max integers being written back to BigQuery. Is there anything you'd like me to test to get you some further feedback? Thanks again! On Fri, 15 Apr 2022 at 18:37, Andrew Pilloud <[email protected]> wrote: > Beam SQL's null aggregation behavior changed radically in 2.34.0. > https://github.com/apache/beam/pull/15174 > > I think we drop them now? > > Andrew > > > On Fri, Apr 15, 2022 at 10:17 AM Brian Hulette <[email protected]> > wrote: > >> Hi Jimmy, >> >> Sorry about this, I wonder if this error message could be more helpful? >> You're right that the issue is that the output PCollection produced by >> HandleNullValues doesn't have a schema attached to it. Beam has no way of >> inferring the output schema through the opaque DoFn. A quick solution might >> be to just propagate the schema from the SQL output: >> >> PCollection<Row> sqlOutput = inputCollection.apply( >> "Generate Aggregates", >> >> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql")) >> ) >> >> PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new >> HandleNullValues())).setRowSchema(inputCollection.getSchema()) >> >> @Reuven Lax <[email protected]> may have some other ideas. >> >> Stepping back to the reason you need to add HandleNullValues: "I call >> this function within `ParDo.of` to detect `Double.MAX_VALUE` and >> `Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL >> returns the Double min/max values when it encounters a `NULL` value, rather >> than just returning NULL." >> @Andrew Pilloud <[email protected]> is this intended? Do you know if >> there's any way to modify this behavior? >> >> Brian >> >> On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon < >> [email protected]> wrote: >> >>> Hello >>> >>> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to >>> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change >>> that I'd appreciate some support with. Sorry this is quite a long one, I >>> wanted to capture as much context as I could, but please shout if there's >>> anything you'd like to dig into. >>> >>> I'd note that I've also raised this on StackOverflow, if you find it >>> easier to read the Markdown there - >>> https://stackoverflow.com/q/71875593/18805546. >>> >>> I'm using Beam inside GCP Dataflow to read data from BigQuery, then >>> processing aggregates before I write the results back to BigQuery. I'm >>> able to read from/write to BigQuery without issue, but after the upgrade my >>> pipeline to calculate aggregates is failing at runtime, specifically a >>> `DoFn` I have written to sanitise the results returned from the Beam >>> `SqlTransform.query` command. I call this function within `ParDo.of` to >>> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX >>> aggregates in Beam SQL returns the Double min/max values when it encounters >>> a `NULL` value, rather than just returning NULL. I did try filtering the >>> initial BigQuery raw data results, but this issue creeps in at the Beam SQL >>> level. >>> >>> There may be better ways to do this (I'm open to suggestions!). I've >>> included a bunch of code snippets from my pipeline that I've tried to >>> simplify, so apologies if there's anything obviously janky. Here's what I >>> previously had before the library upgrade: >>> >>> PCollection<Row> aggregates = inputCollection.apply( >>> "Generate Aggregates", >>> >>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql")) >>> ) >>> .apply(ParDo.of(new HandleNullValues())); >>> >>> I've included the `HandleNullValues` definition at the bottom of this >>> email, but it appears v2.21.0 introduced a breaking change whereby the >>> coder inference was disabled for Beam Row types in [this ticket]( >>> https://issues.apache.org/jira/browse/BEAM-9569). This change has >>> caused the above code to fail with the following runtime error: >>> >>> > [ERROR] Failed to execute goal >>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on >>> > project dataflow-example: An exception occured while executing the >>> > Java class. Unable to return a default Coder for >>> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output >>> > [PCollection@83398426]. Correct one of the following root causes: >>> > [ERROR] No Coder has been manually specified; you may do so using >>> > .setCoder(). [ERROR] Inferring a Coder from the CoderRegistry >>> > failed: Cannot provide a coder for a Beam Row. Please provide a schema >>> > instead using PCollection.setRowSchema. [ERROR] Using the default >>> > output Coder from the producing PTransform failed: >>> > PTransform.getOutputCoder called. >>> >>> I've followed the advice on the aforementioned JIRA ticket, plus a bunch >>> of other examples I found online, but without much joy. I've tried >>> applying `setCoder(SerializableCoder.of(Row.class))` after the >>> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error (though >>> I'm not yet sure if it's just suppressed the error, or if it's actually >>> working), but that changes causes another runtime error: >>> >>> > [ERROR] Failed to execute goal >>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on >>> > project dataflow-example: An exception occured while executing the >>> > Java class. Cannot call getSchema when there is no schema -> [Help 1] >>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to >>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java >>> > (default-cli) on project dataflow-example: An exception occured while >>> > executing the Java class. Cannot call getSchema when there is no >>> > schema >>> >>> This error is thrown further down my pipeline, when I perform a >>> subsequent `SqlTransform.query` to JOIN some results together. >>> >>> PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates) >>> .and(new TupleTag<Row>("Experiments"), experiments) >>> .apply("Joining Aggregates to Experiments", >>> SqlTransform.query(aggregateExperimentJoin())) >>> .apply(ParDo.of(new >>> MapBeamRowsToBigQueryTableRows())) >>> .apply(BigQueryIO.writeTableRows() >>> >>> .withCreateDisposition(CreateDisposition.CREATE_NEVER) >>> >>> .withWriteDisposition(WriteDisposition.WRITE_APPEND) >>> >>> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String, >>> String>) projectId -> projectId + ":daily_aggregates.experiments"))); >>> >>> I've verified the `aggregates` collection is indeed missing a schema if >>> I interrogate the `hasSchema` property. The second `experiments` >>> PCollection above does have a row schema set though: >>> >>> PCollection<Row> rawExperiments = rows.apply( >>> SqlTransform.query("select sessionId, experiments from >>> PCOLLECTION") >>> ); >>> PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new >>> CustomFunctions.ParseExperiments(bigQuerySchema))); >>> experiments.setRowSchema(bigQuerySchema); >>> >>> I've also tried applying this coder at the pipeline level, with >>> different variations on the following. But this also gives the same error: >>> >>> CoderRegistry cr = pipeline.getCoderRegistry(); >>> cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema)); >>> cr.registerCoderForType(TypeDescriptors.rows(), >>> RowCoder.of(bigQuerySchema)); >>> >>> The `bigQuerySchema` object referenced above is the initial schema used >>> to retrieve all raw data from BigQuery, though that part of the pipeline >>> works fine, so potentially I need to pass the `aggregatesSchema` object >>> (see below) in to `registerCoderForType` for the pipeline? >>> >>> I then tried to set the row schema on `aggregates` (which was another >>> suggestion in the error above). I've confirmed that calling `setCoder` is >>> responsible for the previous `Row` schema disappearing, where it had >>> previously been set by the input PCollection (and also if I call >>> `setRowSchema` immediately before I call the `DoFn`. >>> >>> I've simplified the schema for succinctness in this post, but it's a >>> subset of `bigQuerySchema` with a few new fields (simple data types). >>> Here's what I've tried, again with various combinations of where I call >>> `setCoder` and `setRowSchema` (before `apply()` and/or after). >>> >>> Schema aggregatesSchema = Schema.builder() >>> .addNullableField("userId", FieldType.STRING) >>> .addNullableField("sessionId", FieldType.STRING) >>> .addNullableField("experimentsPerDay", FieldType.INT64) >>> .build(); >>> >>> PCollection<Row> aggregates = inputCollection.apply( >>> "Generate Aggregates", >>> >>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql")) >>> ) >>> .apply(ParDo.of(new HandleNullValues())) >>> .setCoder(SerializableCoder.of(Row.class)) >>> .setRowSchema(aggregatesSchema); >>> >>> Unfortunately, this causes a third runtime error which I've not been >>> able to figure out: >>> >>> > [ERROR] Failed to execute goal >>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on >>> > project dataflow-example: An exception occured while executing the >>> > Java class. java.lang.IllegalStateException -> [Help 1] >>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to >>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java >>> > (default-cli) on project dataflow-example: An exception occured while >>> > executing the Java class. java.lang.IllegalStateException >>> >>> The full call stack is at the bottom of this email, and I can see it >>> originating from my `HandleNullValues` `DoFn`, but after that it disappears >>> into the Beam libraries. >>> >>> I'm at a loss as to which route is recommended, and how to proceed, as >>> both coder and schema options are causing different issues. >>> >>> Any help would be greatly appreciated, and thanks for your efforts on >>> this project! >>> >>> >>> The full `DoFn` I've referred to is further below, but it's worth noting >>> that just having an essentially empty `DoFn` with both input and output of >>> Beam `Row` types causes the same issue: >>> >>> public static class HandleNullValues extends DoFn<Row, Row> { >>> @ProcessElement >>> public void processElement(ProcessContext c) { >>> Row row = c.element(); >>> c.output(row); >>> } >>> } >>> >>> Here's the full implementation, if anyone can think of a better way to >>> detect and replace `NULL` values returned from Beam SQL: >>> >>> public static class HandleNullValues extends DoFn<Row, Row> { >>> @ProcessElement >>> public void processElement(ProcessContext c) { >>> Row row = c.element(); >>> List<String> fields = row.getSchema().getFieldNames(); >>> Builder rowBuilder = Row.withSchema(row.getSchema()); >>> >>> for (String f: fields) { >>> Object value = row.getValue(f); >>> if (value != null && value instanceof Long) { >>> Long longVal = row.getInt64(f); >>> if (longVal == Long.MAX_VALUE || longVal == >>> Long.MIN_VALUE) { >>> rowBuilder.addValue(null); >>> } else { >>> rowBuilder.addValue(value); >>> } >>> } else if (value != null && value instanceof Double) { >>> Double doubleVal = row.getDouble(f); >>> if (doubleVal == Double.MAX_VALUE || doubleVal == >>> Double.MIN_VALUE) { >>> rowBuilder.addValue(null); >>> } else { >>> rowBuilder.addValue(value); >>> } >>> } else { >>> rowBuilder.addValue(value); >>> } >>> } >>> >>> Row newRow = rowBuilder.build(); >>> c.output(newRow); >>> } >>> } >>> >>> And here's the full callstack from the `setRowSchema` issue detailed >>> above: >>> >>> >>> > [ERROR] Failed to execute goal >>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on >>> > project dataflow-example: An exception occured while executing the >>> > Java class. java.lang.IllegalStateException -> [Help 1] >>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to >>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java >>> > (default-cli) on project dataflow-example: An exception occured while >>> > executing the Java class. java.lang.IllegalStateException >>> > at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute >>> (MojoExecutor.java:306) >>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>> (MojoExecutor.java:211) >>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>> (MojoExecutor.java:165) >>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>> (MojoExecutor.java:157) >>> > at >>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject >>> > (LifecycleModuleBuilder.java:121) >>> > at >>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject >>> > (LifecycleModuleBuilder.java:81) >>> > at >>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build >>> > (SingleThreadedBuilder.java:56) >>> > at org.apache.maven.lifecycle.internal.LifecycleStarter.execute >>> (LifecycleStarter.java:127) >>> > at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294) >>> > at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192) >>> > at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105) >>> > at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960) >>> > at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293) >>> > at org.apache.maven.cli.MavenCli.main (MavenCli.java:196) >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) >>> > at sun.reflect.NativeMethodAccessorImpl.invoke >>> (NativeMethodAccessorImpl.java:62) >>> > at sun.reflect.DelegatingMethodAccessorImpl.invoke >>> (DelegatingMethodAccessorImpl.java:43) >>> > at java.lang.reflect.Method.invoke (Method.java:498) >>> > at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced >>> > (Launcher.java:282) >>> > at org.codehaus.plexus.classworlds.launcher.Launcher.launch >>> (Launcher.java:225) >>> > at >>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode >>> > (Launcher.java:406) >>> > at org.codehaus.plexus.classworlds.launcher.Launcher.main >>> (Launcher.java:347) Caused by: >>> > org.apache.maven.plugin.MojoExecutionException: An exception occured >>> > while executing the Java class. java.lang.IllegalStateException >>> > at org.codehaus.mojo.exec.ExecJavaMojo.execute >>> (ExecJavaMojo.java:311) >>> > at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo >>> (DefaultBuildPluginManager.java:137) >>> > at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute >>> (MojoExecutor.java:301) >>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>> (MojoExecutor.java:211) >>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>> (MojoExecutor.java:165) >>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>> (MojoExecutor.java:157) >>> > at >>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject >>> > (LifecycleModuleBuilder.java:121) >>> > at >>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject >>> > (LifecycleModuleBuilder.java:81) >>> > at >>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build >>> > (SingleThreadedBuilder.java:56) >>> > at org.apache.maven.lifecycle.internal.LifecycleStarter.execute >>> (LifecycleStarter.java:127) >>> > at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294) >>> > at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192) >>> > at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105) >>> > at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960) >>> > at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293) >>> > at org.apache.maven.cli.MavenCli.main (MavenCli.java:196) >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) >>> > at sun.reflect.NativeMethodAccessorImpl.invoke >>> (NativeMethodAccessorImpl.java:62) >>> > at sun.reflect.DelegatingMethodAccessorImpl.invoke >>> (DelegatingMethodAccessorImpl.java:43) >>> > at java.lang.reflect.Method.invoke (Method.java:498) >>> > at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced >>> > (Launcher.java:282) >>> > at org.codehaus.plexus.classworlds.launcher.Launcher.launch >>> (Launcher.java:225) >>> > at >>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode >>> > (Launcher.java:406) >>> > at org.codehaus.plexus.classworlds.launcher.Launcher.main >>> (Launcher.java:347) Caused by: >>> > org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>> > java.lang.IllegalStateException >>> > at >>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish >>> > (DirectRunner.java:373) >>> > at >>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish >>> > (DirectRunner.java:341) >>> > at org.apache.beam.runners.direct.DirectRunner.run >>> (DirectRunner.java:218) >>> > at org.apache.beam.runners.direct.DirectRunner.run >>> (DirectRunner.java:67) >>> > at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323) >>> > at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309) >>> > at com.example.dataflow.Pipeline.main (Pipeline.java:284) >>> > at org.codehaus.mojo.exec.ExecJavaMojo$1.run >>> (ExecJavaMojo.java:254) >>> > at java.lang.Thread.run (Thread.java:748) Caused by: >>> java.lang.IllegalStateException >>> > at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState >>> > (Preconditions.java:491) >>> > at >>> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate >>> > (RowCoderGenerator.java:314) >>> > at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode >>> (Unknown Source) >>> > at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode >>> (Unknown Source) >>> > at org.apache.beam.sdk.schemas.SchemaCoder.encode >>> (SchemaCoder.java:124) >>> > at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136) >>> > at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream >>> (CoderUtils.java:85) >>> > at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray >>> (CoderUtils.java:69) >>> > at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray >>> (CoderUtils.java:54) >>> > at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144) >>> > at >>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> >>> > (MutationDetectors.java:118) >>> > at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder >>> (MutationDetectors.java:49) >>> > at >>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add >>> > (ImmutabilityCheckingBundleFactory.java:115) >>> > at >>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output >>> > (ParDoEvaluator.java:305) >>> > at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue >>> > (SimpleDoFnRunner.java:268) >>> > at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900 >>> > (SimpleDoFnRunner.java:84) >>> > at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output >>> > (SimpleDoFnRunner.java:416) >>> > at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output >>> > (SimpleDoFnRunner.java:404) >>> > at com.example.dataflow.Pipeline$HandleNullValues.processElement >>> (CustomFunctions.java:310) >>> >>> >>> Cheers! >>> >>
