Hi Reuven,

Thank you for the information. I'll try to replace my usages of Row with
pojos.

The schema is not recursive at all. At least it shouldn't be. But some
properties in the Pojo are Avro generated classes the other properties
might be primitives or other pojos. I'm not sure what properties are in
those Avro generated classes besides the fields I define. Maybe some auto
generated properties are recursive in some weird way.

On Fri., Sep. 10, 2021, 11:52 Reuven Lax, <re...@google.com> wrote:

> You shouldn't need to use Row - Beam schemas are designed specifically so
> that you can continue using POJOs or beans. If you have a POJO registered
> (with @DefaultSchema(JavaFieldSchema.class)) Beam should automatically
> allow you to use it in place of a Row object.
>
> One thing to keep in mind - we don't currently support recursive schemas.
> Seeing the stack trace you get, I wonder if you have recursive (or
> mutually recursive) fields.
>
> Reuven
>
> On Fri, Sep 10, 2021 at 12:53 AM Cristian Constantinescu <zei...@gmail.com>
> wrote:
>
>> Hi everyone,
>>
>> Every article and talk about Beam recommends using Schemas and Row.
>> However, using Row throughout my pipelines makes things very difficult to
>> refactor code when schemas change compared to POJOs/Beans that provide
>> static code analysis in the IDE.
>>
>> Does anyone have any tips or tricks to make refactoring easier when using
>> Row?
>>
>> For example: The initial pipeline transforms used a Person schema Row
>> {firstName:string, lastname:string}. There's some steps that do filtering,
>> and various other things. Now we realize that the Kafka metadata from the
>> Persons topic is also important, so our Row schema becomes Row {metadata:
>> Row {..whatever fields..}, item: Row {firstName: string, lastname:string}}.
>> What would be an easy way to figure out what code changes I need to make to
>> effectively add an "item." in front of the previous fields accessed by name?
>>
>> Search and replace isn't ideal. And for more complex pipelines it quickly
>> becomes very difficult to figure out which fields come from which nested
>> rows and if they need to change due to refactoring. In fact, the only way
>> to refactor is to run the pipeline multiple times and analyse the exact
>> schema at a given line of code before changing it accordingly, then
>> restarting the process with the breakpoint a little further in the pipeline.
>>
>> One solution I thought is to leave my PTransforms use Row in their
>> signature but the first and last steps of those PTransforms would be to use
>> Convert.to(POJO) and Convert.toRows(schema). Basically this provides a
>> static context inside the transforms. However, most of my schemas are
>> derived from Avro schemas, and when adding Avro objects to my POJOs,
>> Convert.to(POJO) throws a StackOverflowError ([1]stacktrace at the end of
>> the email)
>>
>> I guess I could Auto_Value the properties of my Avro objects and
>> use those automatically generated Auto_Values pojos inside my PTransforms.
>> However, that means I have to keep my Avro definitions and my Auto_Values
>> in sync.
>>
>> I understand why Schemas and Row are important, especially after hearing
>> Andrew talk about them at the 2021 Beam summit [3]. However, using Row
>> feels a lot like using DataTable [2] in .NET and that brings back
>> refactoring nightmares.
>>
>> Thanks,
>> Cristian
>>
>> [1]
>> Exception in thread "main" java.lang.StackOverflowError
>> at java.base/java.util.HashMap.hash(HashMap.java:339)
>> at java.base/java.util.HashMap.remove(HashMap.java:794)
>> at java.base/java.util.HashSet.remove(HashSet.java:236)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:88)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.getTypeMappings(TypeResolver.java:384)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver.covariantly(TypeResolver.java:75)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getCovariantTypeResolver(TypeToken.java:1181)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.resolveSupertype(TypeToken.java:273)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getSupertype(TypeToken.java:398)
>> at
>> org.apache.beam.sdk.values.TypeDescriptor.getSupertype(TypeDescriptor.java:188)
>> at
>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:203)
>> at
>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:257)
>> at
>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:172)
>> at
>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>> at
>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>> at
>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>> at
>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>> at
>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>> at
>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>> at
>> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>> at
>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>> at
>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:67)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:92)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>>
>> [2]
>> https://docs.microsoft.com/en-us/dotnet/api/system.data.datatable?view=net-5.0
>> [3] https://www.youtube.com/watch?v=4rDZ0b0TOvc
>>
>

Reply via email to