Hi Rion, As you probably already know, for dynamic indices, you can simply implement your own ElasticsearchSinkFunction <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> [1], where you can create any request that elastic client supports.
The tricky part is how to implement dynamic routing into multiple clusters. - If the elastic clusters are known upfront (before submitting job), you can easily create multiple elastic sinks and prepend them with a simple filter (this is basically what split operator does). - If you discover elastics clusters at runtime, this would require some changes of the current ElasticsearchSink implementation. I think this may be actually as simple as introducing something like DynamicElasticsearchSink, that could dynamically create and managed "child" sinks. This would probably require some thoughts about how to manage consumed resources (memory), because number of child sink could be potentially unbounded. This could be of course simplified if underlying elastic client already supports that, which I'm not aware of. If you'd like to take this path, it would definitely be a great contribution to Flink (I'm able to provide some guidance). [1] https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java Best, D. On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com> wrote: > Hi folks, > > I have a use-case that I wanted to initially pose to the mailing list as > I’m not terribly familiar with the Elasticsearch connector to ensure I’m > not going down the wrong path trying to accomplish this in Flink (or if > something downstream might be a better option). > > Basically, I have the following pieces to the puzzle: > > - A stream of tenant-specific events > - An HTTP endpoint containing mappings for tenant-specific Elastic > cluster information (as each tenant has its own specific Elastic > cluster/index) > > What I’m hoping to accomplish is the following: > > 1. One stream will periodically poll the HTTP endpoint and store these > cluster mappings in state (keyed by tenant with cluster info as the value) > 2. The event stream will be keyed by tenant and connected to the > cluster mappings stream. > 3. I’ll need to an Elasticsearch sink that can route the > tenant-specific event data to its corresponding cluster/index from the > mapping source. > > I know that the existing Elasticsearch sink supports dynamic indices, > however I didn’t know if it’s possible to adjust the cluster like I would > need on a per-tenant basis or if there’s a better approach here? > > Any advice would be appreciated. > > Thanks, > > Rion >