Hi Sachin, Can you post the error that you are getting? It should provide some additional information / path. If you are trying to use DataClient on the pipeline (inside a PTransform, DoFn, etc), you would have to initialize that client inside the DoFn itself (e.g., @Setup <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.Setup.html> ).
If not doing that on purpose, are you using lambda in your transforms in the same class that holds that client as a member, by any chance? Please take a look here for a bit of context: https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms Best, Bruno On Tue, May 9, 2023 at 12:30 AM Sachin Mittal <sjmit...@gmail.com> wrote: > I am trying to create a pipeline where I query paginated data from some > external service via a client and join them into a PCollectionList and > flatten it to get the final collection of data items. > The data class is encoded using a ProtoCoder > > Here is my code: > > ------------------------------------------------------------------------------------------------------------------------- > private static PCollection<Data> loadAllData( > Pipeline pipeline, DataClient client, Instant from, Instant to) { > > PaginatedList<Data> paginatedData = getPaginatedData(client, from, to, 0); > int total = paginatedData.meta().total(); > int limit = paginatedData.meta().limit(); > > List<PCollection<Data>> collections = new ArrayList<>(); > > PCollection<Data> collection = > pipeline > .apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class > ))); > collections.add(collection); > > for (int i = 1; i < total / limit + 1; i++) { > paginatedData = getPaginatedData(client, from, to, i * limit); > collection = > pipeline > .apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class > ))); > observationsCollections.add(collection); > } > PCollectionList<Data> list = PCollectionList.of(collections); > > return list > .apply(Flatten.pCollections()) > .setCoder(ProtoCoder.of(Data.class)); > } > --------------------------------------------------------------------------- > > When I run this pipeline it is complaining at each step like DataClient > has to be serializable. > Even any objects created in the above method have to be serializable, for > example PaginatedList. > > However many of the classes I use like DataClient and PaginatedList are > part of some third party library and > they don't implement serializable. So is there any way to ensure beam is > able to serialize them ? > > Overall in general when designing a pipeline how can we identify what all > objects would have to be serialized. > > Instead of creating this static method, if I create a non-static method > and implement serializable in the base class which contains this method, > would this help ? > > Thanks > Sachin > > >