Hi Sean,

I'm not an expert but I think the .withQuery() functions takes part of the
build stage rather than the runtime stage.
This means that the way ElasticsearchIO was built is so that while the
pipeline is being built you could set the query but it is not possible
during runtime which mean you cannot dynamically run the query based on the
element processed within the pipeline.

To do something like that the transformation must be designed more like the
FileIO in this example: (From
https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/FileIO.html
)

>  PCollection<KV<String, String>> filesAndContents = p
>      .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
>      // withCompression can be omitted - by default compression is detected 
> from the filename.
>      .apply(FileIO.readMatches().withCompression(GZIP))
>      .apply(MapElements
>          // uses imports from TypeDescriptors
>          .into(kvs(strings(), strings()))
>          .via((ReadableFile f) -> {
>            try {
>              return KV.of(
>                  f.getMetadata().resourceId().toString(), 
> f.readFullyAsUTF8String());
>            } catch (IOException ex) {
>              throw new RuntimeException("Failed to read the file", ex);
>            }
>          }));
>
>
If you look at how FileIO.readMatches() works - it doesn't set the filename
when building the pipeline but rather accepts that within the
ProcessElement function.

See here
<https://github.com/apache/beam/blob/bd8950176db0116221a1b739a3916da26d822f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L873>

Does that make sense?

Cheers,
Shahar.

------------------------------

Shahar Frank

srf...@gmail.com

+447799561438

------------------------------






On Wed, 19 Apr 2023 at 18:05, Murphy, Sean P. via user <user@beam.apache.org>
wrote:

> I'm running into an issue using the ElasticsearchIO.read() to handle more
> than one instance of a query. My queries are being dynamically built as a
> PCollection based on an incoming group of values. I'm trying to see how
> to load the .withQuery() parameter which could provide this capability or
> any approach that provides flexibility.
>
>
>
> The issue is that ElasticsearchIO.read() method expects a PBegin input to
> start a pipeline, but it seems like I need access outside of a pipeline
> context somehow. PBegin represents the beginning of a pipeline, and it's
> required to create a pipeline that can read data from Elasticsearch using
> IOElasticsearchIO.read().
>
>
>
> Can I wrap the ElasticsearchIO.read() call in a Create transform that
> creates a PCollection with a single element (e.g., PBegin) to simulate the
> beginning of a pipeline or something similar?
>
>
>
> Here is my naive attempt without accepting the reality of PBegin:
>
>    PCollection<String> queries = ... // a PCollection of Elasticsearch
> queries
>
>
>
>     PCollection<String> queryResults = queries.apply(
>
>         ParDo.of(new DoFn<String, String>() {
>
>             @ProcessElement
>
>             public void processElement(ProcessContext c) {
>
>                 String query = c.element();
>
>                 PCollection<String> results = c.pipeline()
>
>                     .apply(ElasticsearchIO.read()
>
>                         .withConnectionConfiguration(
>
>
> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
>
>                         .withQuery(query));
>
>                 c.output(results);
>
>             }
>
>         })
>
>     .apply(Flatten.pCollections()));
>
>
>
>
>
> In general I'm wondering for any of IO-related classes proved by Beam that
> conforms to PBegin input -- if there is a means to introduce a collection.
>
>
>
> Here is one approach that might be promising:
>
> // Define a ValueProvider for a List<String>
>
> ValueProvider<List<String>> myListProvider =
> ValueProvider.StaticValueProvider.of(myList);
>
>
>
> // Use the ValueProvider to create a PCollection of Strings
>
> PCollection<String> pcoll =
> pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of()));
>
>
>
> PCollection<String> partitionData = PBegin.in(pipeline)
>         .apply("Read data from Elasticsearch", 
> ElasticsearchIO.*read*().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String>
> pcoll).withScrollKeepalive("1m").withBatchSize(50))
>         .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(),
> opt.getMedTagVersion(), opt.getNoteType()));
>
>
>
> Any thoughts or ideas would be great.   Thanks, ~Sean
>
>
>

Reply via email to