For more info on splitable DoFn, there is a good resource on the beam
blog[1].  Alexey has also shown a great alternative!

[1] https://beam.apache.org/blog/splittable-do-fn/

On Thu, Apr 20, 2023 at 9:08 AM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Some Java IO-connectors implement a class something like "class ReadAll
> extends PTransform<PCollection<Read>, PCollection<YourDocument>>” where
> “Read” is supposed to be configured dynamically. As a simple example, take
> a look on “SolrIO” [1]
>
> So, to support what you are looking for, “ReadAll”-pattern should be
> implemented for ElasticsearchIO.
>
> —
> Alexey
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
>
> On 19 Apr 2023, at 19:05, Murphy, Sean P. via user <user@beam.apache.org>
> wrote:
>
> I'm running into an issue using the ElasticsearchIO.read() to handle more
> than one instance of a query. My queries are being dynamically built as a
> PCollection based on an incoming group of values. I'm trying to see how
> to load the .withQuery() parameter which could provide this capability or
> any approach that provides flexibility.
>
> The issue is that ElasticsearchIO.read() method expects a PBegin input to
> start a pipeline, but it seems like I need access outside of a pipeline
> context somehow. PBegin represents the beginning of a pipeline, and it's
> required to create a pipeline that can read data from Elasticsearch using
> IOElasticsearchIO.read().
>
> Can I wrap the ElasticsearchIO.read() call in a Create transform that
> creates a PCollection with a single element (e.g., PBegin) to simulate the
> beginning of a pipeline or something similar?
>
> Here is my naive attempt without accepting the reality of PBegin:
>    PCollection<String> queries = ... // a PCollection of Elasticsearch
> queries
>
>     PCollection<String> queryResults = queries.apply(
>         ParDo.of(new DoFn<String, String>() {
>             @ProcessElement
>             public void processElement(ProcessContext c) {
>                 String query = c.element();
>                 PCollection<String> results = c.pipeline()
>                     .apply(ElasticsearchIO.read()
>                         .withConnectionConfiguration(
>
> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
>                         .withQuery(query));
>                 c.output(results);
>             }
>         })
>     .apply(Flatten.pCollections()));
>
>
>
> In general I'm wondering for any of IO-related classes proved by Beam that
> conforms to PBegin input -- if there is a means to introduce a collection.
>
>
>
> Here is one approach that might be promising:
> // Define a ValueProvider for a List<String>
> ValueProvider<List<String>> myListProvider =
> ValueProvider.StaticValueProvider.of(myList);
>
> // Use the ValueProvider to create a PCollection of Strings
> PCollection<String> pcoll =
> pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of()));
>
> PCollection<String> partitionData = PBegin.in(pipeline)
>         .apply("Read data from Elasticsearch", 
> ElasticsearchIO.*read*().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String>
> pcoll).withScrollKeepalive("1m").withBatchSize(50))
>         .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(),
> opt.getMedTagVersion(), opt.getNoteType()));
>
> Any thoughts or ideas would be great.   Thanks, ~Sean
>
>
>

Reply via email to