I am not using DataClient inside the pipeline but passing it externally to
a static method and
creating a PCollection using pipeline.apply transformations (as shown in my
previous email).
For now I made my DataClient serializable to just progress and later on was
planning to initialize that client within the transformation function.

However in that static method I also create more objects like
PaginatedList<Data> paginatedData = getPaginatedData(client, from, to, 0);
and pass the data from this object to
pipeline.apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.
class)));

The elements of the data list are serialized using ProtoCoder but not
PaginatedList
object itself.
However I am not using PaginatedList anywhere in my pipeline (I have pasted
the code above).

The error I get is something like this:
 java.lang.IllegalArgumentException: unable to serialize
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn@410d4413,
mainOutputTag=Tag<output>, sideInputMapping={},
schemaInformation=DoFnSchemaInformation{elementConverters=[],
fieldAccessDescriptor=*}}
        at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:59)
        at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:736)
        at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:268)
        at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:876)
        at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:264)
        at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:225)
        at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:191)
        at
org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:248)
        at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:787)
        at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:802)
        at
org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274)
        at
org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:290)
        at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
        at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
        at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
        at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
        at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
        at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
        at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
        at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:268)
        at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:218)
        at
org.apache.beam.runners.direct.DirectRunner.performRewrites(DirectRunner.java:254)
        at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:175)
        at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
        ..............


        Caused by:
        java.io.NotSerializableException: data.model.ImmutablePaginatedList
            at
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
            at
java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
            at
java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
            at
java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
           ......................

Hope this gives more context.

Thanks
Sachin


On Tue, May 9, 2023 at 10:12 AM Bruno Volpato via user <user@beam.apache.org>
wrote:

> Hi Sachin,
>
> Can you post the error that you are getting? It should provide some
> additional information / path.
> If you are trying to use DataClient on the pipeline (inside a PTransform,
> DoFn, etc), you would have to initialize that client inside the DoFn itself
> (e.g., @Setup
> <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.Setup.html>
> ).
>
> If not doing that on purpose, are you using lambda in your transforms in
> the same class that holds that client as a member, by any chance?
>
> Please take a look here for a bit of context:
> https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms
>
>
> Best,
> Bruno
>
>
>
>
>
>
>
>
>
> On Tue, May 9, 2023 at 12:30 AM Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> I am trying to create a pipeline where I query paginated data from some
>> external service via a client and join them into a PCollectionList and
>> flatten it to get the final collection of data items.
>> The data class is encoded using a ProtoCoder
>>
>> Here is my code:
>>
>> -------------------------------------------------------------------------------------------------------------------------
>> private static PCollection<Data> loadAllData(
>> Pipeline pipeline, DataClient client, Instant from, Instant to) {
>>
>> PaginatedList<Data> paginatedData = getPaginatedData(client, from, to, 0
>> );
>> int total = paginatedData.meta().total();
>> int limit = paginatedData.meta().limit();
>>
>> List<PCollection<Data>> collections = new ArrayList<>();
>>
>> PCollection<Data> collection =
>> pipeline
>> .apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class
>> )));
>> collections.add(collection);
>>
>> for (int i = 1; i < total / limit + 1; i++) {
>> paginatedData = getPaginatedData(client, from, to, i * limit);
>> collection =
>> pipeline
>> .apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class
>> )));
>> observationsCollections.add(collection);
>> }
>> PCollectionList<Data> list = PCollectionList.of(collections);
>>
>> return list
>> .apply(Flatten.pCollections())
>> .setCoder(ProtoCoder.of(Data.class));
>> }
>>
>> ---------------------------------------------------------------------------
>>
>> When I run this pipeline it is complaining at each step like DataClient
>> has to be serializable.
>> Even any objects created in the above method have to be serializable, for
>> example PaginatedList.
>>
>> However many of the classes I use like DataClient and PaginatedList are
>> part of some third party library and
>> they don't implement serializable. So is there any way to ensure beam is
>> able to serialize them ?
>>
>> Overall in general when designing a pipeline how can we identify what all
>> objects would have to be serialized.
>>
>> Instead of creating this static method, if I create a non-static method
>> and implement serializable in the base class which contains this method,
>> would this help ?
>>
>> Thanks
>> Sachin
>>
>>
>>

Reply via email to