I am not using DataClient inside the pipeline but passing it externally to a static method and creating a PCollection using pipeline.apply transformations (as shown in my previous email). For now I made my DataClient serializable to just progress and later on was planning to initialize that client within the transformation function.
However in that static method I also create more objects like PaginatedList<Data> paginatedData = getPaginatedData(client, from, to, 0); and pass the data from this object to pipeline.apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data. class))); The elements of the data list are serialized using ProtoCoder but not PaginatedList object itself. However I am not using PaginatedList anywhere in my pipeline (I have pasted the code above). The error I get is something like this: java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn@410d4413, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[], fieldAccessDescriptor=*}} at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:59) at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:736) at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:268) at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:876) at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:264) at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:225) at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:191) at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:248) at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:787) at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:802) at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274) at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:290) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469) at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:268) at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:218) at org.apache.beam.runners.direct.DirectRunner.performRewrites(DirectRunner.java:254) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:175) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323) .............. Caused by: java.io.NotSerializableException: data.model.ImmutablePaginatedList at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185) at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ...................... Hope this gives more context. Thanks Sachin On Tue, May 9, 2023 at 10:12 AM Bruno Volpato via user <user@beam.apache.org> wrote: > Hi Sachin, > > Can you post the error that you are getting? It should provide some > additional information / path. > If you are trying to use DataClient on the pipeline (inside a PTransform, > DoFn, etc), you would have to initialize that client inside the DoFn itself > (e.g., @Setup > <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.Setup.html> > ). > > If not doing that on purpose, are you using lambda in your transforms in > the same class that holds that client as a member, by any chance? > > Please take a look here for a bit of context: > https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms > > > Best, > Bruno > > > > > > > > > > On Tue, May 9, 2023 at 12:30 AM Sachin Mittal <sjmit...@gmail.com> wrote: > >> I am trying to create a pipeline where I query paginated data from some >> external service via a client and join them into a PCollectionList and >> flatten it to get the final collection of data items. >> The data class is encoded using a ProtoCoder >> >> Here is my code: >> >> ------------------------------------------------------------------------------------------------------------------------- >> private static PCollection<Data> loadAllData( >> Pipeline pipeline, DataClient client, Instant from, Instant to) { >> >> PaginatedList<Data> paginatedData = getPaginatedData(client, from, to, 0 >> ); >> int total = paginatedData.meta().total(); >> int limit = paginatedData.meta().limit(); >> >> List<PCollection<Data>> collections = new ArrayList<>(); >> >> PCollection<Data> collection = >> pipeline >> .apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class >> ))); >> collections.add(collection); >> >> for (int i = 1; i < total / limit + 1; i++) { >> paginatedData = getPaginatedData(client, from, to, i * limit); >> collection = >> pipeline >> .apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class >> ))); >> observationsCollections.add(collection); >> } >> PCollectionList<Data> list = PCollectionList.of(collections); >> >> return list >> .apply(Flatten.pCollections()) >> .setCoder(ProtoCoder.of(Data.class)); >> } >> >> --------------------------------------------------------------------------- >> >> When I run this pipeline it is complaining at each step like DataClient >> has to be serializable. >> Even any objects created in the above method have to be serializable, for >> example PaginatedList. >> >> However many of the classes I use like DataClient and PaginatedList are >> part of some third party library and >> they don't implement serializable. So is there any way to ensure beam is >> able to serialize them ? >> >> Overall in general when designing a pipeline how can we identify what all >> objects would have to be serialized. >> >> Instead of creating this static method, if I create a non-static method >> and implement serializable in the base class which contains this method, >> would this help ? >> >> Thanks >> Sachin >> >> >>