Thanks, It worked. I was confused before as I was thinking the sink builder is called only once but it gets called for every batch request, correct me if my understanding is wrong.
On Fri, May 29, 2020 at 9:08 AM Leonard Xu <xbjt...@gmail.com> wrote: > Hi,aj > > In the implementation of ElasticsearchSink, ElasticsearchSink won't > create index and only start a Elastic client for sending requests to > the Elastic cluster. You can simply extract the index(date value in your > case) from your timestamp field and then put it to an IndexRequest[2], > ElasticsearchSink will send the IndexRequests to the Elastic cluster, > Elastic cluster will create corresponding index and flush the records. > > BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch > sql connector [2], you can simply config 'connector.index' = > ‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals. > > Best, > Leoanrd Xu > [1] > https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119 > > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector > > > > > 在 2020年5月29日,02:43,aj <ajainje...@gmail.com> 写道: > > Hello All, > > I am getting many events in Kafka and I have written a link job that sinks > that Avro records from Kafka to S3 in parquet format. > > Now, I want to sink these records into elastic search. but the only > challenge is that I want to sink record on time indices. Basically, In > Elastic, I want to create a per day index with the date as the suffix. > So in Flink stream job if I create an es sink how will I change the sink > to start writing in a new index when the first event of the day arrives > > Thanks, > Anuj. > > > <http://www.oracle.com/> > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> > > > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>