Hi Reuven,
Thanks for the quick reply.
Could you elaborate why Beam needs to create the Builder dynamically
through reflection (basically using reflection to create an instance of a
package private class)? As far as AutoValue goes, it looks like an
anti-pattern to try to create an instance of the generated builder by
calling the AutoValue generated class (AutoValue_App_FooAutoValue in this
case). I think that normally, the only place that can call the auto
generated builder constructor is from the user code abstract class
(FooAutoValue) through:
public static Builder builder() {
return new AutoValue_App_FooAutoValue.Builder();
}
In fact, this method is directly called when using the @SchemaCreate
method, regardless if the create method is called through reflection or
not. I guess what I'm asking is, could beam not call the
FooAutoValue.builder() dynamically if directly is not possible?
On Tue, Oct 26, 2021 at 2:15 PM Reuven Lax <[email protected]> wrote:
> Beam needs to create these elements dynamically. when decoding records, so
> it can't easily call the builder directly.
>
> My first guess is that there's a classloader issue here. Flink does some
> fancy classloader munging, and that might be breaking an assumption in this
> code. Passing in the correct classloader should hopefully fix this.
>
> Reuven
>
>
> On Tue, Oct 26, 2021 at 10:59 AM Cristian Constantinescu <[email protected]>
> wrote:
>
>> Hi everyone,
>>
>> Not sure if anyone is using Beam with the Flink Runner and AutoValue
>> builders. For me, it doesn't work. I have some questions and a workaround
>> for anyone in the same boat.
>>
>> Beam 2.31, Flink 1.13, AutoValue 1.8.2
>>
>> Here's the code:
>>
>> package org.whatever.testing;
>>
>> import com.google.auto.value.AutoValue;
>> import org.apache.beam.sdk.Pipeline;
>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>> import org.apache.beam.sdk.schemas.AutoValueSchema;
>> import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
>> import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
>> import org.apache.beam.sdk.schemas.transforms.Convert;
>> import org.apache.beam.sdk.transforms.Create;
>> import org.apache.beam.sdk.transforms.MapElements;
>> import org.apache.beam.sdk.values.TypeDescriptor;
>>
>> import java.util.Arrays;
>>
>> public class App {
>>
>> public static void main(String[] args) {
>> var options =
>> PipelineOptionsFactory.fromArgs(args).withValidation().create();
>>
>> var p = Pipeline.create(options);
>> p
>>
>> .apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build())))
>> .apply(Convert.to(FooAutoValue.class))
>>
>> .apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i -> {
>> System.out.println(i.toString());
>> return i;
>> }))
>> ;
>> p.run().waitUntilFinish();
>> }
>> @AutoValue
>> @DefaultSchema(AutoValueSchema.class)
>> public static abstract class FooAutoValue {
>> public abstract String getDummyProp();
>>
>> // @SchemaCreate
>> // public static FooAutoValue create(String dummyProp) {
>> // return builder()
>> // .setDummyProp(dummyProp)
>> // .build();
>> // }
>>
>> public static Builder builder() {
>> return new AutoValue_App_FooAutoValue.Builder();
>> }
>>
>> @AutoValue.Builder
>> public abstract static class Builder {
>> public abstract Builder setDummyProp(String newDummyProp);
>>
>> public abstract FooAutoValue build();
>> }
>> }
>> }
>>
>> Note that it doesn't matter if FooAutoValue is an inner class or in its
>> own file as a top level non static class. For simplicity here I'm
>> converting the objects to the same class, in prod code the input is of
>> another type with equivalent schema.
>>
>> And the stack trace:
>>
>> Caused by: java.lang.IllegalAccessError: failed to access class
>> org.whatever.testing.AutoValue_App_FooAutoValue$Builder from class
>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1
>> (org.whatever.testing.AutoValue_App_FooAutoValue$Builder is in unnamed
>> module of loader 'app';
>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 is in
>> unnamed module of loader org.apache.flink.util.ChildFirstClassLoader
>> @26f4afda)
>> at
>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1.create(Unknown
>> Source)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:96)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:66)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129)
>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:95)
>> at
>> org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:518)
>> at
>> org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:500)
>> at
>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
>> at
>> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:80)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>> at java.base/java.lang.Thread.run(Thread.java:829)
>>
>>
>> Workaround:
>> - Uncomment the schemaCreate method
>> - compile the code with "-parameters", if using maven:
>>
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-compiler-plugin</artifactId>
>> <version>3.8.1</version>
>> <configuration>
>> <compilerArgs>
>> <arg>
>> -parameters
>> </arg>
>> </compilerArgs>
>> ....
>>
>>
>> My questions:
>> 1. Why is Beam trying to get the generated AutoValue Builder through
>> reflection using AutoValueUtils.getAutoValueGeneratedName (builds
>> "AutoValue_App_FooAutoValue$Builder") instead of calling
>> FooAutoValue.builder() directly without reflection?
>> 2. With flink, given the fancy classloader work Flink does [1], in the
>> AutoValueUtils.createBuilderCreator there is a call to
>> ReflectHelpers.findClassLoader() which gets the thread classLoader, in this
>> case it's a ChildFirstClassLoader. However the builder is registered as
>> package private in the app classloader. Is there a reason why Beam
>> registers the generated coder on the thread classLoader?
>>
>> As per usual, if this is deemed a valid bug, please let me know and I can
>> eventually make a PR fixing it.
>>
>> Thank you,
>> Cristian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/
>>
>