Some Java IO-connectors implement a class something like "class ReadAll extends 
PTransform<PCollection<Read>, PCollection<YourDocument>>” where “Read” is 
supposed to be configured dynamically. As a simple example, take a look on 
“SolrIO” [1] 

So, to support what you are looking for, “ReadAll”-pattern should be 
implemented for ElasticsearchIO.

—
Alexey

[1] 
https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java

> On 19 Apr 2023, at 19:05, Murphy, Sean P. via user <user@beam.apache.org> 
> wrote:
> 
> I'm running into an issue using the ElasticsearchIO.read() to handle more 
> than one instance of a query. My queries are being dynamically built as a 
> PCollection based on an incoming group of values. I'm trying to see how to 
> load the .withQuery() parameter which could provide this capability or any 
> approach that provides flexibility.
>  
> The issue is that ElasticsearchIO.read() method expects a PBegin input to 
> start a pipeline, but it seems like I need access outside of a pipeline 
> context somehow. PBegin represents the beginning of a pipeline, and it's 
> required to create a pipeline that can read data from Elasticsearch using 
> IOElasticsearchIO.read().
>  
> Can I wrap the ElasticsearchIO.read() call in a Create transform that creates 
> a PCollection with a single element (e.g., PBegin) to simulate the beginning 
> of a pipeline or something similar?
>  
> Here is my naive attempt without accepting the reality of PBegin:
>    PCollection<String> queries = ... // a PCollection of Elasticsearch queries
>    
>     PCollection<String> queryResults = queries.apply(
>         ParDo.of(new DoFn<String, String>() {
>             @ProcessElement
>             public void processElement(ProcessContext c) {
>                 String query = c.element();
>                 PCollection<String> results = c.pipeline()
>                     .apply(ElasticsearchIO.read()
>                         .withConnectionConfiguration(
>                             
> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
>                         .withQuery(query));
>                 c.output(results);
>             }
>         })
>     .apply(Flatten.pCollections()));
>  
>  
> In general I'm wondering for any of IO-related classes proved by Beam that 
> conforms to PBegin input -- if there is a means to introduce a collection.
> 
>  
> 
> Here is one approach that might be promising:
> 
> // Define a ValueProvider for a List<String>
> ValueProvider<List<String>> myListProvider = 
> ValueProvider.StaticValueProvider.of(myList);
>  
> // Use the ValueProvider to create a PCollection of Strings
> PCollection<String> pcoll = pipeline.apply(Create.ofProvider(myListProvider, 
> ListCoder.of()));
>  
> PCollection<String> partitionData = PBegin.in(pipeline)
>         .apply("Read data from Elasticsearch", 
> ElasticsearchIO.read().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String>
>  pcoll).withScrollKeepalive("1m").withBatchSize(50))
>         .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(), 
> opt.getMedTagVersion(), opt.getNoteType()));
>  
> Any thoughts or ideas would be great.   Thanks, ~Sean

Reply via email to