Amazing, thank you for your patience and support with this, Andrew. I can see that you've tagged that JIRA item for the v2.39.0 release, do you know when that release is scheduled to ship by any chance?
Thanks also for your explanation of setting coders and schemas - unfortunately I'm still unable to get a simple example working where DoFn<Row, Row> is used. I've created the below Git Gist which initialises a collection of three Row objects, performs a basic Beam SQL aggregation, before finally passing the Row elements through a DoFn where ideally I'd like to create new Row objects, but for now it does nothing. Any time I use a DoFn<Row, Row> my output PCollection loses its row schema, even if I pass through the original Row object, or if I create a new Row object and explicitly call withSchema. https://gist.github.com/jimmyheaddon/ce1185b83e4ef30e3f9d6fa1b9b16455 You'll see I call getSchema at the bottom of the Gist, which causes the exception "Cannot call getSchema when there is no schema", as the schema which existed before the DoFn has been lost. However, if I try to set the row schema again I instead get an undefined IllegalStateException, so even if I create the Row objects inside the DoFn with a schema, the resultant PCollection has lost it, and I'm unable to set it again, which seems like a bug? Thanks again, and let me know if you'd like me to test anything else. On Mon, 18 Apr 2022 at 20:18, Andrew Pilloud <[email protected]> wrote: > I was able to reproduce this with all the inputs being null, thanks for > that pointer. I filed https://issues.apache.org/jira/browse/BEAM-14321 > and will look into a fix today. > > From your example it looks like the input to SqlTransform doesn't have a > schema, it is using a simple coder which doesn't provide the metadata > SqlTransform needs to operate on the data. (Also SqlTransform calls > setRowSchema internally, so you shouldn't need to call it on the output.) > > At a high level, setCoder is saying "here is a method that can serialize > my Java object", setRowSchema is saying "here is metadata about my row (so > you can serialize it)", and setSchema is saying "here is metadata about > my Java object and a method to convert my Java object to a row (so you can > serialize it)". The Schema methods are newer and provide more metadata that > enables things like SqlTransform to operate on your data. > Calling setRowSchema is required when you are working with Row directly. > Generally you shouldn't call setSchema directly as the schema can be > inferred from other types: > https://beam.apache.org/documentation/programming-guide/#inferring-schemas > (I don't believe we infer schemas from simple types today, you need a pojo.) > > Andrew > > On Sun, Apr 17, 2022 at 4:36 AM Jimmy Headdon <[email protected]> > wrote: > >> Hi Brian >> >> This is the shortest pipeline Gist I can come up with to demonstrate >> "java.lang.IllegalStateException: Cannot call getSchema when there is no >> schema". You'll see I've tried setRowSchema and setCoder, but with the >> same end result. Any chance you can advise where I'm going wrong, as I >> wanted to setup a simple pipeline for Andrew on the NULL aggregation >> results. >> >> https://gist.github.com/jimmyheaddon/f0350a29f69c745e31c442942874eb12 >> >> >> Thanks again >> >> On Sat, 16 Apr 2022 at 09:08, Jimmy Headdon <[email protected]> >> wrote: >> >>> Thanks Andrew, out of interest does your test pass if all of the input >>> values to the MIN/MAX aggregates are NULL? If I have a BigQuery column >>> that contains entirely NULL values, and I then convert them from TableRow >>> to Row types, they're still showing correctly as NULL if I perform a simple >>> System.out.printLn: >>> >>> rowsFromBigQuery.apply(ParDo.of(new >>> PrintParDo())).setCoder(SerializableCoder.of(Row.class)); >>> "NULL" >>> "NULL" >>> "NULL".... >>> >>> If I then apply the following Beam SQL on this PCollection: >>> >>> select max(experimentValue) as experimentValue >>> from PCOLLECTION >>> >>> Then the results come back as -9223372036854775808, >>> or 9223372036854775807 if you use MIN(). >>> >>> Hopefully I'm doing something silly and it's an easy fix, let me know if >>> there's anything you'd like me to try. >>> >>> >>> Cheers >>> >>> >>> On Fri, 15 Apr 2022 at 21:20, Andrew Pilloud <[email protected]> >>> wrote: >>> >>>> Are you sure the min/max values are coming from SqlTransform? I wrote a >>>> quick test in Beam (using Integer, but all types have the same null >>>> wrapper) and the nulls were dropped. >>>> >>>> More detail: I added the following test case >>>> to BeamSqlDslAggregationNullableTest on the latest Beam. The input values >>>> for f_int1 are 1, null, 2, null, null, null, 3. The test passed, (the >>>> return value being 1) which indicates we are dropping nulls before >>>> aggregation. (I don't believe this is actually correct behavior, we should >>>> be returning null?) >>>> >>>> @Test >>>> public void testMin() { >>>> String sql = "SELECT min(f_int1) FROM PCOLLECTION"; >>>> >>>> >>>> PAssert.that(boundedInput.apply(SqlTransform.query(sql))).satisfies(matchesScalar(1)); >>>> pipeline.run(); >>>> } >>>> >>>> On Fri, Apr 15, 2022 at 11:37 AM Jimmy Headdon < >>>> [email protected]> wrote: >>>> >>>>> Thanks for the swift response Brian, Andrew. I've tried your >>>>> suggestion Brian, and sadly I get the same error as the lengthy call >>>>> stack from the end of my original post (IllegalStateException) - it >>>>> appears >>>>> the PCollection might have been finalised my the DoFn, and therefore I >>>>> cannot setRowSchema against it? In the fully implemented version I >>>>> captured in my original post you can see I call withSchema when creating >>>>> the Row objects, though interestingly the cutdown version I also posted >>>>> gives the same error, even though it's passing the input row to the >>>>> output without mutating it? >>>>> >>>>> Regarding the NULL values from Beam SQL aggregations, I've re-run my >>>>> pipeline with my NullValueHandler commented out, and unfortunately I can >>>>> still see min and max integers being written back to BigQuery. Is there >>>>> anything you'd like me to test to get you some further feedback? >>>>> >>>>> Thanks again! >>>>> >>>>> On Fri, 15 Apr 2022 at 18:37, Andrew Pilloud <[email protected]> >>>>> wrote: >>>>> >>>>>> Beam SQL's null aggregation behavior changed radically in 2.34.0. >>>>>> https://github.com/apache/beam/pull/15174 >>>>>> >>>>>> I think we drop them now? >>>>>> >>>>>> Andrew >>>>>> >>>>>> >>>>>> On Fri, Apr 15, 2022 at 10:17 AM Brian Hulette <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Jimmy, >>>>>>> >>>>>>> Sorry about this, I wonder if this error message could be more >>>>>>> helpful? >>>>>>> You're right that the issue is that the output PCollection produced >>>>>>> by HandleNullValues doesn't have a schema attached to it. Beam has no >>>>>>> way >>>>>>> of inferring the output schema through the opaque DoFn. A quick solution >>>>>>> might be to just propagate the schema from the SQL output: >>>>>>> >>>>>>> PCollection<Row> sqlOutput = inputCollection.apply( >>>>>>> "Generate Aggregates", >>>>>>> >>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql")) >>>>>>> ) >>>>>>> >>>>>>> PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new >>>>>>> HandleNullValues())).setRowSchema(inputCollection.getSchema()) >>>>>>> >>>>>>> @Reuven Lax <[email protected]> may have some other ideas. >>>>>>> >>>>>>> Stepping back to the reason you need to add HandleNullValues: "I >>>>>>> call this function within `ParDo.of` to detect `Double.MAX_VALUE` and >>>>>>> `Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL >>>>>>> returns the Double min/max values when it encounters a `NULL` value, >>>>>>> rather >>>>>>> than just returning NULL." >>>>>>> @Andrew Pilloud <[email protected]> is this intended? Do you know >>>>>>> if there's any way to modify this behavior? >>>>>>> >>>>>>> Brian >>>>>>> >>>>>>> On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hello >>>>>>>> >>>>>>>> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to >>>>>>>> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking >>>>>>>> change >>>>>>>> that I'd appreciate some support with. Sorry this is quite a long >>>>>>>> one, I >>>>>>>> wanted to capture as much context as I could, but please shout if >>>>>>>> there's >>>>>>>> anything you'd like to dig into. >>>>>>>> >>>>>>>> I'd note that I've also raised this on StackOverflow, if you find >>>>>>>> it easier to read the Markdown there - >>>>>>>> https://stackoverflow.com/q/71875593/18805546. >>>>>>>> >>>>>>>> I'm using Beam inside GCP Dataflow to read data from BigQuery, then >>>>>>>> processing aggregates before I write the results back to BigQuery. I'm >>>>>>>> able to read from/write to BigQuery without issue, but after the >>>>>>>> upgrade my >>>>>>>> pipeline to calculate aggregates is failing at runtime, specifically a >>>>>>>> `DoFn` I have written to sanitise the results returned from the Beam >>>>>>>> `SqlTransform.query` command. I call this function within `ParDo.of` >>>>>>>> to >>>>>>>> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling >>>>>>>> MIN/MAX >>>>>>>> aggregates in Beam SQL returns the Double min/max values when it >>>>>>>> encounters >>>>>>>> a `NULL` value, rather than just returning NULL. I did try filtering >>>>>>>> the >>>>>>>> initial BigQuery raw data results, but this issue creeps in at the >>>>>>>> Beam SQL >>>>>>>> level. >>>>>>>> >>>>>>>> There may be better ways to do this (I'm open to suggestions!). >>>>>>>> I've included a bunch of code snippets from my pipeline that I've >>>>>>>> tried to >>>>>>>> simplify, so apologies if there's anything obviously janky. Here's >>>>>>>> what I >>>>>>>> previously had before the library upgrade: >>>>>>>> >>>>>>>> PCollection<Row> aggregates = inputCollection.apply( >>>>>>>> "Generate Aggregates", >>>>>>>> >>>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql")) >>>>>>>> ) >>>>>>>> .apply(ParDo.of(new HandleNullValues())); >>>>>>>> >>>>>>>> I've included the `HandleNullValues` definition at the bottom of >>>>>>>> this email, but it appears v2.21.0 introduced a breaking change >>>>>>>> whereby the >>>>>>>> coder inference was disabled for Beam Row types in [this ticket]( >>>>>>>> https://issues.apache.org/jira/browse/BEAM-9569). This change has >>>>>>>> caused the above code to fail with the following runtime error: >>>>>>>> >>>>>>>> > [ERROR] Failed to execute goal >>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on >>>>>>>> > project dataflow-example: An exception occured while executing the >>>>>>>> > Java class. Unable to return a default Coder for >>>>>>>> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output >>>>>>>> > [PCollection@83398426]. Correct one of the following root causes: >>>>>>>> > [ERROR] No Coder has been manually specified; you may do so >>>>>>>> using >>>>>>>> > .setCoder(). [ERROR] Inferring a Coder from the CoderRegistry >>>>>>>> > failed: Cannot provide a coder for a Beam Row. Please provide a >>>>>>>> schema >>>>>>>> > instead using PCollection.setRowSchema. [ERROR] Using the >>>>>>>> default >>>>>>>> > output Coder from the producing PTransform failed: >>>>>>>> > PTransform.getOutputCoder called. >>>>>>>> >>>>>>>> I've followed the advice on the aforementioned JIRA ticket, plus a >>>>>>>> bunch of other examples I found online, but without much joy. I've >>>>>>>> tried >>>>>>>> applying `setCoder(SerializableCoder.of(Row.class))` after the >>>>>>>> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error >>>>>>>> (though >>>>>>>> I'm not yet sure if it's just suppressed the error, or if it's actually >>>>>>>> working), but that changes causes another runtime error: >>>>>>>> >>>>>>>> > [ERROR] Failed to execute goal >>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on >>>>>>>> > project dataflow-example: An exception occured while executing the >>>>>>>> > Java class. Cannot call getSchema when there is no schema -> >>>>>>>> [Help 1] >>>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to >>>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java >>>>>>>> > (default-cli) on project dataflow-example: An exception occured >>>>>>>> while >>>>>>>> > executing the Java class. Cannot call getSchema when there is no >>>>>>>> > schema >>>>>>>> >>>>>>>> This error is thrown further down my pipeline, when I perform a >>>>>>>> subsequent `SqlTransform.query` to JOIN some results together. >>>>>>>> >>>>>>>> PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates) >>>>>>>> .and(new TupleTag<Row>("Experiments"), >>>>>>>> experiments) >>>>>>>> .apply("Joining Aggregates to Experiments", >>>>>>>> SqlTransform.query(aggregateExperimentJoin())) >>>>>>>> .apply(ParDo.of(new >>>>>>>> MapBeamRowsToBigQueryTableRows())) >>>>>>>> .apply(BigQueryIO.writeTableRows() >>>>>>>> >>>>>>>> .withCreateDisposition(CreateDisposition.CREATE_NEVER) >>>>>>>> >>>>>>>> .withWriteDisposition(WriteDisposition.WRITE_APPEND) >>>>>>>> >>>>>>>> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String, >>>>>>>> String>) projectId -> projectId + ":daily_aggregates.experiments"))); >>>>>>>> >>>>>>>> I've verified the `aggregates` collection is indeed missing a >>>>>>>> schema if I interrogate the `hasSchema` property. The second >>>>>>>> `experiments` >>>>>>>> PCollection above does have a row schema set though: >>>>>>>> >>>>>>>> PCollection<Row> rawExperiments = rows.apply( >>>>>>>> SqlTransform.query("select sessionId, experiments from >>>>>>>> PCOLLECTION") >>>>>>>> ); >>>>>>>> PCollection<Row> experiments = >>>>>>>> rawExperiments.apply(ParDo.of(new >>>>>>>> CustomFunctions.ParseExperiments(bigQuerySchema))); >>>>>>>> experiments.setRowSchema(bigQuerySchema); >>>>>>>> >>>>>>>> I've also tried applying this coder at the pipeline level, with >>>>>>>> different variations on the following. But this also gives the same >>>>>>>> error: >>>>>>>> >>>>>>>> CoderRegistry cr = pipeline.getCoderRegistry(); >>>>>>>> cr.registerCoderForClass(Row.class, >>>>>>>> RowCoder.of(bigQuerySchema)); >>>>>>>> cr.registerCoderForType(TypeDescriptors.rows(), >>>>>>>> RowCoder.of(bigQuerySchema)); >>>>>>>> >>>>>>>> The `bigQuerySchema` object referenced above is the initial schema >>>>>>>> used to retrieve all raw data from BigQuery, though that part of the >>>>>>>> pipeline works fine, so potentially I need to pass the >>>>>>>> `aggregatesSchema` >>>>>>>> object (see below) in to `registerCoderForType` for the pipeline? >>>>>>>> >>>>>>>> I then tried to set the row schema on `aggregates` (which was >>>>>>>> another suggestion in the error above). I've confirmed that calling >>>>>>>> `setCoder` is responsible for the previous `Row` schema disappearing, >>>>>>>> where >>>>>>>> it had previously been set by the input PCollection (and also if I call >>>>>>>> `setRowSchema` immediately before I call the `DoFn`. >>>>>>>> >>>>>>>> I've simplified the schema for succinctness in this post, but it's >>>>>>>> a subset of `bigQuerySchema` with a few new fields (simple data types). >>>>>>>> Here's what I've tried, again with various combinations of where I call >>>>>>>> `setCoder` and `setRowSchema` (before `apply()` and/or after). >>>>>>>> >>>>>>>> Schema aggregatesSchema = Schema.builder() >>>>>>>> .addNullableField("userId", FieldType.STRING) >>>>>>>> .addNullableField("sessionId", FieldType.STRING) >>>>>>>> .addNullableField("experimentsPerDay", FieldType.INT64) >>>>>>>> .build(); >>>>>>>> >>>>>>>> PCollection<Row> aggregates = inputCollection.apply( >>>>>>>> "Generate Aggregates", >>>>>>>> >>>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql")) >>>>>>>> ) >>>>>>>> .apply(ParDo.of(new HandleNullValues())) >>>>>>>> .setCoder(SerializableCoder.of(Row.class)) >>>>>>>> .setRowSchema(aggregatesSchema); >>>>>>>> >>>>>>>> Unfortunately, this causes a third runtime error which I've not >>>>>>>> been able to figure out: >>>>>>>> >>>>>>>> > [ERROR] Failed to execute goal >>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on >>>>>>>> > project dataflow-example: An exception occured while executing the >>>>>>>> > Java class. java.lang.IllegalStateException -> [Help 1] >>>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to >>>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java >>>>>>>> > (default-cli) on project dataflow-example: An exception occured >>>>>>>> while >>>>>>>> > executing the Java class. java.lang.IllegalStateException >>>>>>>> >>>>>>>> The full call stack is at the bottom of this email, and I can see >>>>>>>> it originating from my `HandleNullValues` `DoFn`, but after that it >>>>>>>> disappears into the Beam libraries. >>>>>>>> >>>>>>>> I'm at a loss as to which route is recommended, and how to proceed, >>>>>>>> as both coder and schema options are causing different issues. >>>>>>>> >>>>>>>> Any help would be greatly appreciated, and thanks for your efforts >>>>>>>> on this project! >>>>>>>> >>>>>>>> >>>>>>>> The full `DoFn` I've referred to is further below, but it's worth >>>>>>>> noting that just having an essentially empty `DoFn` with both input and >>>>>>>> output of Beam `Row` types causes the same issue: >>>>>>>> >>>>>>>> public static class HandleNullValues extends DoFn<Row, Row> { >>>>>>>> @ProcessElement >>>>>>>> public void processElement(ProcessContext c) { >>>>>>>> Row row = c.element(); >>>>>>>> c.output(row); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> Here's the full implementation, if anyone can think of a better way >>>>>>>> to detect and replace `NULL` values returned from Beam SQL: >>>>>>>> >>>>>>>> public static class HandleNullValues extends DoFn<Row, Row> { >>>>>>>> @ProcessElement >>>>>>>> public void processElement(ProcessContext c) { >>>>>>>> Row row = c.element(); >>>>>>>> List<String> fields = row.getSchema().getFieldNames(); >>>>>>>> Builder rowBuilder = Row.withSchema(row.getSchema()); >>>>>>>> >>>>>>>> for (String f: fields) { >>>>>>>> Object value = row.getValue(f); >>>>>>>> if (value != null && value instanceof Long) { >>>>>>>> Long longVal = row.getInt64(f); >>>>>>>> if (longVal == Long.MAX_VALUE || longVal == >>>>>>>> Long.MIN_VALUE) { >>>>>>>> rowBuilder.addValue(null); >>>>>>>> } else { >>>>>>>> rowBuilder.addValue(value); >>>>>>>> } >>>>>>>> } else if (value != null && value instanceof >>>>>>>> Double) { >>>>>>>> Double doubleVal = row.getDouble(f); >>>>>>>> if (doubleVal == Double.MAX_VALUE || doubleVal >>>>>>>> == Double.MIN_VALUE) { >>>>>>>> rowBuilder.addValue(null); >>>>>>>> } else { >>>>>>>> rowBuilder.addValue(value); >>>>>>>> } >>>>>>>> } else { >>>>>>>> rowBuilder.addValue(value); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> Row newRow = rowBuilder.build(); >>>>>>>> c.output(newRow); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> And here's the full callstack from the `setRowSchema` issue >>>>>>>> detailed above: >>>>>>>> >>>>>>>> >>>>>>>> > [ERROR] Failed to execute goal >>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on >>>>>>>> > project dataflow-example: An exception occured while executing the >>>>>>>> > Java class. java.lang.IllegalStateException -> [Help 1] >>>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to >>>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java >>>>>>>> > (default-cli) on project dataflow-example: An exception occured >>>>>>>> while >>>>>>>> > executing the Java class. java.lang.IllegalStateException >>>>>>>> > at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute >>>>>>>> (MojoExecutor.java:306) >>>>>>>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>>>>>>> (MojoExecutor.java:211) >>>>>>>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>>>>>>> (MojoExecutor.java:165) >>>>>>>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>>>>>>> (MojoExecutor.java:157) >>>>>>>> > at >>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject >>>>>>>> > (LifecycleModuleBuilder.java:121) >>>>>>>> > at >>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject >>>>>>>> > (LifecycleModuleBuilder.java:81) >>>>>>>> > at >>>>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build >>>>>>>> > (SingleThreadedBuilder.java:56) >>>>>>>> > at >>>>>>>> org.apache.maven.lifecycle.internal.LifecycleStarter.execute >>>>>>>> (LifecycleStarter.java:127) >>>>>>>> > at org.apache.maven.DefaultMaven.doExecute >>>>>>>> (DefaultMaven.java:294) >>>>>>>> > at org.apache.maven.DefaultMaven.doExecute >>>>>>>> (DefaultMaven.java:192) >>>>>>>> > at org.apache.maven.DefaultMaven.execute >>>>>>>> (DefaultMaven.java:105) >>>>>>>> > at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960) >>>>>>>> > at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293) >>>>>>>> > at org.apache.maven.cli.MavenCli.main (MavenCli.java:196) >>>>>>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native >>>>>>>> Method) >>>>>>>> > at sun.reflect.NativeMethodAccessorImpl.invoke >>>>>>>> (NativeMethodAccessorImpl.java:62) >>>>>>>> > at sun.reflect.DelegatingMethodAccessorImpl.invoke >>>>>>>> (DelegatingMethodAccessorImpl.java:43) >>>>>>>> > at java.lang.reflect.Method.invoke (Method.java:498) >>>>>>>> > at >>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced >>>>>>>> > (Launcher.java:282) >>>>>>>> > at org.codehaus.plexus.classworlds.launcher.Launcher.launch >>>>>>>> (Launcher.java:225) >>>>>>>> > at >>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode >>>>>>>> > (Launcher.java:406) >>>>>>>> > at org.codehaus.plexus.classworlds.launcher.Launcher.main >>>>>>>> (Launcher.java:347) Caused by: >>>>>>>> > org.apache.maven.plugin.MojoExecutionException: An exception >>>>>>>> occured >>>>>>>> > while executing the Java class. java.lang.IllegalStateException >>>>>>>> > at org.codehaus.mojo.exec.ExecJavaMojo.execute >>>>>>>> (ExecJavaMojo.java:311) >>>>>>>> > at >>>>>>>> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo >>>>>>>> (DefaultBuildPluginManager.java:137) >>>>>>>> > at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute >>>>>>>> (MojoExecutor.java:301) >>>>>>>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>>>>>>> (MojoExecutor.java:211) >>>>>>>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>>>>>>> (MojoExecutor.java:165) >>>>>>>> > at org.apache.maven.lifecycle.internal.MojoExecutor.execute >>>>>>>> (MojoExecutor.java:157) >>>>>>>> > at >>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject >>>>>>>> > (LifecycleModuleBuilder.java:121) >>>>>>>> > at >>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject >>>>>>>> > (LifecycleModuleBuilder.java:81) >>>>>>>> > at >>>>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build >>>>>>>> > (SingleThreadedBuilder.java:56) >>>>>>>> > at >>>>>>>> org.apache.maven.lifecycle.internal.LifecycleStarter.execute >>>>>>>> (LifecycleStarter.java:127) >>>>>>>> > at org.apache.maven.DefaultMaven.doExecute >>>>>>>> (DefaultMaven.java:294) >>>>>>>> > at org.apache.maven.DefaultMaven.doExecute >>>>>>>> (DefaultMaven.java:192) >>>>>>>> > at org.apache.maven.DefaultMaven.execute >>>>>>>> (DefaultMaven.java:105) >>>>>>>> > at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960) >>>>>>>> > at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293) >>>>>>>> > at org.apache.maven.cli.MavenCli.main (MavenCli.java:196) >>>>>>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native >>>>>>>> Method) >>>>>>>> > at sun.reflect.NativeMethodAccessorImpl.invoke >>>>>>>> (NativeMethodAccessorImpl.java:62) >>>>>>>> > at sun.reflect.DelegatingMethodAccessorImpl.invoke >>>>>>>> (DelegatingMethodAccessorImpl.java:43) >>>>>>>> > at java.lang.reflect.Method.invoke (Method.java:498) >>>>>>>> > at >>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced >>>>>>>> > (Launcher.java:282) >>>>>>>> > at org.codehaus.plexus.classworlds.launcher.Launcher.launch >>>>>>>> (Launcher.java:225) >>>>>>>> > at >>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode >>>>>>>> > (Launcher.java:406) >>>>>>>> > at org.codehaus.plexus.classworlds.launcher.Launcher.main >>>>>>>> (Launcher.java:347) Caused by: >>>>>>>> > org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>>>>>> > java.lang.IllegalStateException >>>>>>>> > at >>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish >>>>>>>> > (DirectRunner.java:373) >>>>>>>> > at >>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish >>>>>>>> > (DirectRunner.java:341) >>>>>>>> > at org.apache.beam.runners.direct.DirectRunner.run >>>>>>>> (DirectRunner.java:218) >>>>>>>> > at org.apache.beam.runners.direct.DirectRunner.run >>>>>>>> (DirectRunner.java:67) >>>>>>>> > at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323) >>>>>>>> > at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309) >>>>>>>> > at com.example.dataflow.Pipeline.main (Pipeline.java:284) >>>>>>>> > at org.codehaus.mojo.exec.ExecJavaMojo$1.run >>>>>>>> (ExecJavaMojo.java:254) >>>>>>>> > at java.lang.Thread.run (Thread.java:748) Caused by: >>>>>>>> java.lang.IllegalStateException >>>>>>>> > at >>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState >>>>>>>> > (Preconditions.java:491) >>>>>>>> > at >>>>>>>> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate >>>>>>>> > (RowCoderGenerator.java:314) >>>>>>>> > at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode >>>>>>>> (Unknown Source) >>>>>>>> > at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode >>>>>>>> (Unknown Source) >>>>>>>> > at org.apache.beam.sdk.schemas.SchemaCoder.encode >>>>>>>> (SchemaCoder.java:124) >>>>>>>> > at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136) >>>>>>>> > at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream >>>>>>>> (CoderUtils.java:85) >>>>>>>> > at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray >>>>>>>> (CoderUtils.java:69) >>>>>>>> > at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray >>>>>>>> (CoderUtils.java:54) >>>>>>>> > at org.apache.beam.sdk.util.CoderUtils.clone >>>>>>>> (CoderUtils.java:144) >>>>>>>> > at >>>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> >>>>>>>> > (MutationDetectors.java:118) >>>>>>>> > at >>>>>>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder >>>>>>>> (MutationDetectors.java:49) >>>>>>>> > at >>>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add >>>>>>>> > (ImmutabilityCheckingBundleFactory.java:115) >>>>>>>> > at >>>>>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output >>>>>>>> > (ParDoEvaluator.java:305) >>>>>>>> > at >>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue >>>>>>>> > (SimpleDoFnRunner.java:268) >>>>>>>> > at >>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900 >>>>>>>> > (SimpleDoFnRunner.java:84) >>>>>>>> > at >>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output >>>>>>>> > (SimpleDoFnRunner.java:416) >>>>>>>> > at >>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output >>>>>>>> > (SimpleDoFnRunner.java:404) >>>>>>>> > at >>>>>>>> com.example.dataflow.Pipeline$HandleNullValues.processElement >>>>>>>> (CustomFunctions.java:310) >>>>>>>> >>>>>>>> >>>>>>>> Cheers! >>>>>>>> >>>>>>>
