For more info on splitable DoFn, there is a good resource on the beam blog[1]. Alexey has also shown a great alternative!
[1] https://beam.apache.org/blog/splittable-do-fn/ On Thu, Apr 20, 2023 at 9:08 AM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Some Java IO-connectors implement a class something like "class ReadAll > extends PTransform<PCollection<Read>, PCollection<YourDocument>>” where > “Read” is supposed to be configured dynamically. As a simple example, take > a look on “SolrIO” [1] > > So, to support what you are looking for, “ReadAll”-pattern should be > implemented for ElasticsearchIO. > > — > Alexey > > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java > > On 19 Apr 2023, at 19:05, Murphy, Sean P. via user <user@beam.apache.org> > wrote: > > I'm running into an issue using the ElasticsearchIO.read() to handle more > than one instance of a query. My queries are being dynamically built as a > PCollection based on an incoming group of values. I'm trying to see how > to load the .withQuery() parameter which could provide this capability or > any approach that provides flexibility. > > The issue is that ElasticsearchIO.read() method expects a PBegin input to > start a pipeline, but it seems like I need access outside of a pipeline > context somehow. PBegin represents the beginning of a pipeline, and it's > required to create a pipeline that can read data from Elasticsearch using > IOElasticsearchIO.read(). > > Can I wrap the ElasticsearchIO.read() call in a Create transform that > creates a PCollection with a single element (e.g., PBegin) to simulate the > beginning of a pipeline or something similar? > > Here is my naive attempt without accepting the reality of PBegin: > PCollection<String> queries = ... // a PCollection of Elasticsearch > queries > > PCollection<String> queryResults = queries.apply( > ParDo.of(new DoFn<String, String>() { > @ProcessElement > public void processElement(ProcessContext c) { > String query = c.element(); > PCollection<String> results = c.pipeline() > .apply(ElasticsearchIO.read() > .withConnectionConfiguration( > > ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName)) > .withQuery(query)); > c.output(results); > } > }) > .apply(Flatten.pCollections())); > > > > In general I'm wondering for any of IO-related classes proved by Beam that > conforms to PBegin input -- if there is a means to introduce a collection. > > > > Here is one approach that might be promising: > // Define a ValueProvider for a List<String> > ValueProvider<List<String>> myListProvider = > ValueProvider.StaticValueProvider.of(myList); > > // Use the ValueProvider to create a PCollection of Strings > PCollection<String> pcoll = > pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of())); > > PCollection<String> partitionData = PBegin.in(pipeline) > .apply("Read data from Elasticsearch", > ElasticsearchIO.*read*().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String> > pcoll).withScrollKeepalive("1m").withBatchSize(50)) > .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(), > opt.getMedTagVersion(), opt.getNoteType())); > > Any thoughts or ideas would be great. Thanks, ~Sean > > >