Hi Sachin,

Can you post the error that you are getting? It should provide some
additional information / path.
If you are trying to use DataClient on the pipeline (inside a PTransform,
DoFn, etc), you would have to initialize that client inside the DoFn itself
(e.g., @Setup
<https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.Setup.html>
).

If not doing that on purpose, are you using lambda in your transforms in
the same class that holds that client as a member, by any chance?

Please take a look here for a bit of context:
https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms


Best,
Bruno









On Tue, May 9, 2023 at 12:30 AM Sachin Mittal <sjmit...@gmail.com> wrote:

> I am trying to create a pipeline where I query paginated data from some
> external service via a client and join them into a PCollectionList and
> flatten it to get the final collection of data items.
> The data class is encoded using a ProtoCoder
>
> Here is my code:
>
> -------------------------------------------------------------------------------------------------------------------------
> private static PCollection<Data> loadAllData(
> Pipeline pipeline, DataClient client, Instant from, Instant to) {
>
> PaginatedList<Data> paginatedData = getPaginatedData(client, from, to, 0);
> int total = paginatedData.meta().total();
> int limit = paginatedData.meta().limit();
>
> List<PCollection<Data>> collections = new ArrayList<>();
>
> PCollection<Data> collection =
> pipeline
> .apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class
> )));
> collections.add(collection);
>
> for (int i = 1; i < total / limit + 1; i++) {
> paginatedData = getPaginatedData(client, from, to, i * limit);
> collection =
> pipeline
> .apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class
> )));
> observationsCollections.add(collection);
> }
> PCollectionList<Data> list = PCollectionList.of(collections);
>
> return list
> .apply(Flatten.pCollections())
> .setCoder(ProtoCoder.of(Data.class));
> }
> ---------------------------------------------------------------------------
>
> When I run this pipeline it is complaining at each step like DataClient
> has to be serializable.
> Even any objects created in the above method have to be serializable, for
> example PaginatedList.
>
> However many of the classes I use like DataClient and PaginatedList are
> part of some third party library and
> they don't implement serializable. So is there any way to ensure beam is
> able to serialize them ?
>
> Overall in general when designing a pipeline how can we identify what all
> objects would have to be serialized.
>
> Instead of creating this static method, if I create a non-static method
> and implement serializable in the base class which contains this method,
> would this help ?
>
> Thanks
> Sachin
>
>
>

Reply via email to