Hi Rion,

As you probably already know, for dynamic indices, you can simply implement
your own ElasticsearchSinkFunction
<https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
[1], where you can create any request that elastic client supports.

The tricky part is how to implement dynamic routing into multiple clusters.
- If the elastic clusters are known upfront (before submitting job), you
can easily create multiple elastic sinks and prepend them with a simple
filter (this is basically what split operator does).
- If you discover elastics clusters at runtime, this would require some
changes of the current ElasticsearchSink implementation. I think this may
be actually as simple as introducing something like
DynamicElasticsearchSink, that could dynamically create and managed "child"
sinks. This would probably require some thoughts about how to manage
consumed resources (memory), because number of child sink could be
potentially unbounded. This could be of course simplified if underlying
elastic client already supports that, which I'm not aware of. If you'd like
to take this path, it would definitely be a great contribution to Flink
(I'm able to provide some guidance).

[1]
https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java

Best,
D.

On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com> wrote:

> Hi folks,
>
> I have a use-case that I wanted to initially pose to the mailing list as
> I’m not terribly familiar with the Elasticsearch connector to ensure I’m
> not going down the wrong path trying to accomplish this in Flink (or if
> something downstream might be a better option).
>
> Basically, I have the following pieces to the puzzle:
>
>    - A stream of tenant-specific events
>    - An HTTP endpoint containing mappings for tenant-specific Elastic
>    cluster information (as each tenant has its own specific Elastic
>    cluster/index)
>
> What I’m hoping to accomplish is the following:
>
>    1. One stream will periodically poll the HTTP endpoint and store these
>    cluster mappings in state (keyed by tenant with cluster info as the value)
>    2. The event stream will be keyed by tenant and connected to the
>    cluster mappings stream.
>    3. I’ll need to an Elasticsearch sink that can route the
>    tenant-specific event data to its corresponding cluster/index from the
>    mapping source.
>
> I know that the existing Elasticsearch sink supports dynamic indices,
> however I didn’t know if it’s possible to adjust the cluster like I would
> need on a per-tenant basis or if there’s a better approach here?
>
> Any advice would be appreciated.
>
> Thanks,
>
> Rion
>

Reply via email to