[jira] [Created] (FLINK-32028) Error handling for ElasticSearch sink

2023-05-08 Thread Tudor Plugaru (Jira)
Tudor Plugaru created FLINK-32028:
-

 Summary: Error handling for ElasticSearch sink
 Key: FLINK-32028
 URL: https://issues.apache.org/jira/browse/FLINK-32028
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Affects Versions: 1.16.1
Reporter: Tudor Plugaru


The deprecated ElasticsearchSink supports setting an error handler via a 
[public method 
|[https://github.com/apache/flink-connector-elasticsearch/blob/8f75d4e059c09b55cc3a44bab3e64330b1246d27/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSink.java#L216],]
 but the new sink, does not.

Ideally there would be a way to handle ES specific exceptions and be able to 
skip items from being retried on ES indefinitely and not block entirely the 
pipeline.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-26638) Reintroduce ActionFailureHandler for Elasticsearch sink connectors

2022-03-14 Thread Alexander Preuss (Jira)
Alexander Preuss created FLINK-26638:


 Summary: Reintroduce ActionFailureHandler for Elasticsearch sink 
connectors
 Key: FLINK-26638
 URL: https://issues.apache.org/jira/browse/FLINK-26638
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / ElasticSearch
Affects Versions: 1.16.0
Reporter: Alexander Preuss


In FLINK-26281 we found out that users depend on the ActionFailureHandler that 
was not ported over to the new unified Sink. We not want to add the failure 
handler back.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25571) Update Elasticsearch Sink to use decomposed interfaces

2022-01-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25571:
---

 Summary: Update Elasticsearch Sink to use decomposed interfaces
 Key: FLINK-25571
 URL: https://issues.apache.org/jira/browse/FLINK-25571
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / ElasticSearch
Affects Versions: 1.15.0
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24323) Port ElasticSearch Sink to new Unified Sink API (FLIP-143)

2021-09-17 Thread Alexander Preuss (Jira)
Alexander Preuss created FLINK-24323:


 Summary: Port ElasticSearch Sink to new Unified Sink API (FLIP-143)
 Key: FLINK-24323
 URL: https://issues.apache.org/jira/browse/FLINK-24323
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream, Table SQL / API
Affects Versions: 1.15.0
Reporter: Alexander Preuss


We want to port the current ElasticSearch Sink to the new Unified Sink API as 
was done with the [Kafka 
Sink|https://issues.apache.org/jira/browse/FLINK-22902].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19115) Null value fields cannot be ignored in ElasticSearch Sink

2020-09-01 Thread McClone (Jira)
McClone created FLINK-19115:
---

 Summary: Null value fields cannot be ignored in ElasticSearch Sink
 Key: FLINK-19115
 URL: https://issues.apache.org/jira/browse/FLINK-19115
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Affects Versions: 1.11.0
Reporter: McClone


Null value fields cannot be ignored in ElasticSearch Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18359) Improve error-log strategy for Elasticsearch sink for large data documentId conflict when using create mode for `insert ignore` semantics

2020-06-18 Thread rinkako (Jira)
rinkako created FLINK-18359:
---

 Summary: Improve error-log strategy for Elasticsearch sink for 
large data documentId conflict when using create mode for `insert ignore` 
semantics
 Key: FLINK-18359
 URL: https://issues.apache.org/jira/browse/FLINK-18359
 Project: Flink
  Issue Type: Improvement
Reporter: rinkako


The story is: when a flink job for ingesting large number of records from data 
source, processing and indexing to ES with Elasticsearch sink fault, we may 
restart it from a specific data set which contains lots of data which already 
sink into ES.

At this case, a `INSERT IGNORE` semantics is necessary, and we use `public 
IndexRequest create(boolean create)` with `true` args and ignore the 409 
restStatusCode at a customized ActionRequestFailureHandler to make it work.

But, the `BulkProcessorListener` always log a error event before it calls the 
`failureHandler` in its `afterBulk` method, and will produce tons of error log 
for document id conflict, which we already know and handle them in customized 
ActionRequestFailureHandler.

Therefore, it seems that the error log action at the 
ActionRequestFailureHandler (either the default IgnoringFailureHandler or a 
custom handler) is more flexible ?

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18093) E2E tests manually for Elasticsearch Sink

2020-06-03 Thread Jark Wu (Jira)
Jark Wu created FLINK-18093:
---

 Summary: E2E tests manually for Elasticsearch Sink
 Key: FLINK-18093
 URL: https://issues.apache.org/jira/browse/FLINK-18093
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Jark Wu
 Fix For: 1.11.0


- test Kafka 2 ES with insert-only query
- test Kafka 2 ES with upsert query



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17623) Elasticsearch sink should support user resource cleanup

2020-05-11 Thread Yun Wang (Jira)
Yun Wang created FLINK-17623:


 Summary: Elasticsearch sink should support user resource cleanup
 Key: FLINK-17623
 URL: https://issues.apache.org/jira/browse/FLINK-17623
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: Yun Wang


There should be a way for an 
[ElasticsearchSinkFunction|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java#L61]]
 implementation to use resources with the same lifecycle as the Elasticsearch 
sink, for example, an 
[RestHighLevelClient|[https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high.html]].

Currently there is no way to clean up such resources. This can be achieved by 
exposing a `close()` method in the ElasticsearchSinkFunction interface, and 
invoke the close method from 
E[lasticsearchSinkBase.close|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L331]]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17116) support UsernamePasswordCredentials for elasticsearch sink via SQL DDL

2020-04-13 Thread Kai Chen (Jira)
Kai Chen created FLINK-17116:


 Summary: support UsernamePasswordCredentials for elasticsearch 
sink via SQL DDL
 Key: FLINK-17116
 URL: https://issues.apache.org/jira/browse/FLINK-17116
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / ElasticSearch, Table SQL / Ecosystem
Reporter: Kai Chen
 Fix For: 1.11.0


support UsernamePasswordCredentials for elasticsearch sink via SQL DDL.

For example:
{code:java}

CREATE TABLE es_sink (
...
) WITH (
  'connector.type' = 'elasticsearch',
  'connector.version' = '6',
  'connector.hosts' = 'host:port',
  'connector.index' = 'index_name',
  'connector.document-type' = 'type_name',
  'connector.credential.username' = 'es_username',
  'connector.credential.password' = 'es_password'
  ...
);
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16736) Expose Elasticsearch sink metric

2020-03-23 Thread YangLee (Jira)
YangLee created FLINK-16736:
---

 Summary: Expose Elasticsearch sink metric
 Key: FLINK-16736
 URL: https://issues.apache.org/jira/browse/FLINK-16736
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: YangLee


I found it quite useful to expose metric of sink operator via flink metric 
system for debugging and monitoring purpose, such as:
 * total times of bulk request
 * total size(sum) of bulk request
 * total duration(sum) spent on bulk request

with these metrics, we could derive the average size and duration.

Elasticsearch SDK already exposed all these information by 
BulkProcessorListenr. A simple bridge should be enough.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16701) Elasticsearch sink support alias for indices.

2020-03-20 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-16701:
--

 Summary: Elasticsearch sink support alias for indices.
 Key: FLINK-16701
 URL: https://issues.apache.org/jira/browse/FLINK-16701
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Affects Versions: 1.11.0
Reporter: Leonard Xu
 Fix For: 1.11.0


This is related to 
[FLINK-15400|https://issues.apache.org/jira/browse/FLINK-15400]  FLINK-15400 
will only support dynamic index, and do not support the alias.  Because 
supporting alias both need in Streaming API and Table API, so I think split the 
original design to two PRs make sense.
PR for FLINK-15400:

        support dynamic index for ElasticsearchTableSink
PR for this issue:

          support alias for Streaming API and Table API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16495) Improve default flush strategy for Elasticsearch sink to make it work out-of-box

2020-03-08 Thread Jark Wu (Jira)
Jark Wu created FLINK-16495:
---

 Summary: Improve default flush strategy for Elasticsearch sink to 
make it work out-of-box
 Key: FLINK-16495
 URL: https://issues.apache.org/jira/browse/FLINK-16495
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch, Table SQL / Ecosystem
Reporter: Jark Wu
 Fix For: 1.11.0


Currently, Elasticsearch sink provides 3 flush options: 

{code:java}
'connector.bulk-flush.max-actions' = '42'
'connector.bulk-flush.max-size' = '42 mb'
'connector.bulk-flush.interval' = '6'
{code}

All of them are optional and have no default value in Flink side [1]. But flush 
actions and flush size have a default value {{1000}} and {{5mb}} in 
Elasticsearch client [2]. This results in some surprising behavior that no 
results are outputed by default, see user report [3]. Because it has to wait 
for 1000 records however there is no so many records in the testing. 

This will also be a potential "problem" in production. Because if it's a low 
throughout job, soem data may take a very long time to be visible in the 
elasticsearch. 

In this issue, I propose to have Flink's default values for these 3 options. 

{code:java}
'connector.bulk-flush.max-actions' = '1000'   -- same to the ES client default 
value
'connector.bulk-flush.max-size' = '5mb'  -- same to the ES client default value
'connector.bulk-flush.interval' = '5s'  -- avoid no output result
{code}


[1]: 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L357-L356
[2]: 
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html
[3]: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Should-I-use-a-Sink-or-Connector-Or-Both-td33352.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15400) elasticsearch sink support dynamic index.

2019-12-26 Thread ouyangwulin (Jira)
ouyangwulin created FLINK-15400:
---

 Summary: elasticsearch sink support dynamic index.
 Key: FLINK-15400
 URL: https://issues.apache.org/jira/browse/FLINK-15400
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / ElasticSearch
Affects Versions: 1.9.1, 1.9.0, 1.11.0
Reporter: ouyangwulin
 Fix For: 1.11.0


>From 
>user...@flink.apache.org([https://lists.apache.org/thread.html/ac4e0c068baeb3b070f0213a2e1314e6552b226b8132a4c49d667ecd%40%3Cuser-zh.flink.apache.org%3E]),
>  Becuase the es 6/7 not support ttl. so User need clean the index by 
>timestamp. Add dynamic index is a useful function.  Add with properties 
>'dynamicIndex' as a switch for open dynamicIndex. Add with  properties 
>'indexField'  for the extract time field, Add properties 'indexInterval' for 
>change cycle mode.

 
||With property||discribe||default||Required||
|dynamicIndex|Dynamic or not|false(true/false)|false|
|indexField|extract index field| none|dynamicIndex is true , then indexField is 
required,only supported type "timestamp","date","long" |
|indexInterval|mode for  cycle|d|ddynamicIndex is true , this field is required 
,可选参数值如下:
d:day
m:mouth
w:week|

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Flink Elasticsearch Sink Issue

2019-06-21 Thread Ramya Ramamurthy
By default, flushOnCheckpoint is set to True.
So ideally, based on env.enableCheckpointing(30);  the flush to ES
must be triggered every 30seconds, though our ES Flush timeout is 60
seconds.
If the above assumption is correct, then still we do not see packets
getting flushed till the next packet/batch arrives.

Thanks.

On Fri, Jun 21, 2019 at 6:07 PM Ramya Ramamurthy  wrote:

> Yes, we do maintain checkpoints
> env.enableCheckpointing(30);
>
> But we assumed it is for Kafka consumer offsets. Not sure how this is
> useful in this case? Can you pls. elaborate on this.
>
> ~Ramya.
>
>
>
> On Fri, Jun 21, 2019 at 4:33 PM miki haiat  wrote:
>
>> Did you set some checkpoints  configuration?
>>
>> On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy  wrote:
>>
>> > Hi,
>> >
>> > We use Kafka->Flink->Elasticsearch in our project.
>> > The data to the elasticsearch is not getting flushed, till the next
>> batch
>> > arrives.
>> > E.g.: If the first batch contains 1000 packets, this gets pushed to the
>> > Elastic, only after the next batch arrives [irrespective of reaching the
>> > batch time limit].
>> > Below are the sink configurations we use  currently.
>> >
>> > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
>> > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
>> > esSinkBuilder.setBulkFlushInterval(6); // 1 minute once
>> > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if
>> bulk
>> > fails
>> > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
>> > esSinkBuilder.setBulkFlushBackoff(true);
>> >
>> > Sink code :
>> > List httpHosts = new ArrayList<>();
>> > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
>> > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));
>> >
>> > ElasticsearchSink.Builder esSinkBuilder = new
>> > ElasticsearchSink.Builder<>(
>> > httpHosts,
>> > new ElasticsearchSinkFunction() {
>> >
>> > private IndexRequest createIndexRequest(byte[] document, String
>> indexDate)
>> > {
>> >
>> > return new IndexRequest(esIndex + indexDate, esType)
>> > .source(document, XContentType.JSON);
>> >
>> > }
>> >
>> > @Override
>> > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
>> > requestIndexer) {
>> > byte[] byteArray = serializationSchema.serialize(r);
>> >
>> > ObjectMapper mapper = new ObjectMapper();
>> > ObjectWriter writer = mapper.writer();
>> >
>> > try {
>> > JsonNode jsonNode = mapper.readTree(byteArray);
>> >
>> > long tumbleStart = jsonNode.get("fseen").asLong();
>> >
>> > ZonedDateTime utc =
>> > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
>> > String indexDate =
>> DateTimeFormatter.ofPattern(".MM.dd").format(utc);
>> >
>> > byte[] document = writer.writeValueAsBytes(jsonNode);
>> >
>> > requestIndexer.add(createIndexRequest(document, indexDate));
>> > } catch (Exception e) {
>> > System.out.println("In the error block");
>> > }
>> >
>> > }
>> > }
>> > );
>> >
>> > Has anyone faced this issue? Any help would be appreciated !!
>> >
>> > Thanks,
>> >
>>
>


Re: Flink Elasticsearch Sink Issue

2019-06-21 Thread Ramya Ramamurthy
Yes, we do maintain checkpoints
env.enableCheckpointing(30);

But we assumed it is for Kafka consumer offsets. Not sure how this is
useful in this case? Can you pls. elaborate on this.

~Ramya.



On Fri, Jun 21, 2019 at 4:33 PM miki haiat  wrote:

> Did you set some checkpoints  configuration?
>
> On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy  wrote:
>
> > Hi,
> >
> > We use Kafka->Flink->Elasticsearch in our project.
> > The data to the elasticsearch is not getting flushed, till the next batch
> > arrives.
> > E.g.: If the first batch contains 1000 packets, this gets pushed to the
> > Elastic, only after the next batch arrives [irrespective of reaching the
> > batch time limit].
> > Below are the sink configurations we use  currently.
> >
> > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
> > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
> > esSinkBuilder.setBulkFlushInterval(6); // 1 minute once
> > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk
> > fails
> > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
> > esSinkBuilder.setBulkFlushBackoff(true);
> >
> > Sink code :
> > List httpHosts = new ArrayList<>();
> > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
> > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));
> >
> > ElasticsearchSink.Builder esSinkBuilder = new
> > ElasticsearchSink.Builder<>(
> > httpHosts,
> > new ElasticsearchSinkFunction() {
> >
> > private IndexRequest createIndexRequest(byte[] document, String
> indexDate)
> > {
> >
> > return new IndexRequest(esIndex + indexDate, esType)
> > .source(document, XContentType.JSON);
> >
> > }
> >
> > @Override
> > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
> > requestIndexer) {
> > byte[] byteArray = serializationSchema.serialize(r);
> >
> > ObjectMapper mapper = new ObjectMapper();
> > ObjectWriter writer = mapper.writer();
> >
> > try {
> > JsonNode jsonNode = mapper.readTree(byteArray);
> >
> > long tumbleStart = jsonNode.get("fseen").asLong();
> >
> > ZonedDateTime utc =
> > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
> > String indexDate = DateTimeFormatter.ofPattern(".MM.dd").format(utc);
> >
> > byte[] document = writer.writeValueAsBytes(jsonNode);
> >
> > requestIndexer.add(createIndexRequest(document, indexDate));
> > } catch (Exception e) {
> > System.out.println("In the error block");
> > }
> >
> > }
> > }
> > );
> >
> > Has anyone faced this issue? Any help would be appreciated !!
> >
> > Thanks,
> >
>


Re: Flink Elasticsearch Sink Issue

2019-06-21 Thread miki haiat
Did you set some checkpoints  configuration?

On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy  wrote:

> Hi,
>
> We use Kafka->Flink->Elasticsearch in our project.
> The data to the elasticsearch is not getting flushed, till the next batch
> arrives.
> E.g.: If the first batch contains 1000 packets, this gets pushed to the
> Elastic, only after the next batch arrives [irrespective of reaching the
> batch time limit].
> Below are the sink configurations we use  currently.
>
> esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
> esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
> esSinkBuilder.setBulkFlushInterval(6); // 1 minute once
> esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk
> fails
> esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
> esSinkBuilder.setBulkFlushBackoff(true);
>
> Sink code :
> List httpHosts = new ArrayList<>();
> //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
> httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));
>
> ElasticsearchSink.Builder esSinkBuilder = new
> ElasticsearchSink.Builder<>(
> httpHosts,
> new ElasticsearchSinkFunction() {
>
> private IndexRequest createIndexRequest(byte[] document, String indexDate)
> {
>
> return new IndexRequest(esIndex + indexDate, esType)
> .source(document, XContentType.JSON);
>
> }
>
> @Override
> public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
> requestIndexer) {
> byte[] byteArray = serializationSchema.serialize(r);
>
> ObjectMapper mapper = new ObjectMapper();
> ObjectWriter writer = mapper.writer();
>
> try {
> JsonNode jsonNode = mapper.readTree(byteArray);
>
> long tumbleStart = jsonNode.get("fseen").asLong();
>
> ZonedDateTime utc =
> Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
> String indexDate = DateTimeFormatter.ofPattern(".MM.dd").format(utc);
>
> byte[] document = writer.writeValueAsBytes(jsonNode);
>
> requestIndexer.add(createIndexRequest(document, indexDate));
> } catch (Exception e) {
> System.out.println("In the error block");
> }
>
> }
> }
> );
>
> Has anyone faced this issue? Any help would be appreciated !!
>
> Thanks,
>


Flink Elasticsearch Sink Issue

2019-06-21 Thread Ramya Ramamurthy
Hi,

We use Kafka->Flink->Elasticsearch in our project.
The data to the elasticsearch is not getting flushed, till the next batch
arrives.
E.g.: If the first batch contains 1000 packets, this gets pushed to the
Elastic, only after the next batch arrives [irrespective of reaching the
batch time limit].
Below are the sink configurations we use  currently.

esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
esSinkBuilder.setBulkFlushInterval(6); // 1 minute once
esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk
fails
esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
esSinkBuilder.setBulkFlushBackoff(true);

Sink code :
List httpHosts = new ArrayList<>();
//httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));

ElasticsearchSink.Builder esSinkBuilder = new
ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction() {

private IndexRequest createIndexRequest(byte[] document, String indexDate) {

return new IndexRequest(esIndex + indexDate, esType)
.source(document, XContentType.JSON);

}

@Override
public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
requestIndexer) {
byte[] byteArray = serializationSchema.serialize(r);

ObjectMapper mapper = new ObjectMapper();
ObjectWriter writer = mapper.writer();

try {
JsonNode jsonNode = mapper.readTree(byteArray);

long tumbleStart = jsonNode.get("fseen").asLong();

ZonedDateTime utc =
Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
String indexDate = DateTimeFormatter.ofPattern(".MM.dd").format(utc);

byte[] document = writer.writeValueAsBytes(jsonNode);

requestIndexer.add(createIndexRequest(document, indexDate));
} catch (Exception e) {
System.out.println("In the error block");
}

}
}
);

Has anyone faced this issue? Any help would be appreciated !!

Thanks,


Re: Elasticsearch Sink

2018-05-18 Thread Christophe Jolif
>> >
>> > Thanks for your feedback (and Flavio for your support!)
>> >
>> > About your remarks/questions:
>> >
>> > - Maybe we can consider removing support for ES 1.x and 2.x starting
>> from
>> >
>> > 1.6. Those are very old ES versions (considering that ES 6.x has already
>> > been out for a while). Do you think this would simply how our base
>> module
>> > APIs are designed?
>> >
>> > I would tend to say it should not change drastically the picture but
>> > would
>> > have to look into it.
>> >
>> > - Wouldn't it be possible to have a REST implementation of the
>> >
>> > `ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If
>> > so,
>> > once we remove ES 1.x and 2.x, it might actually be possible to
>> > completely
>> > replace the current `elasticsearch-base` module.
>> >
>> > The High level REST API was introduced in Elasticsearch 5.6 so it is not
>> > possible to cover 5.5 and below with it.
>> >
>> > If all the necessary APIs are already here (to be double checked) it
>> > should
>> > be able cover 5.6. What I noticed when working on the PRs is that 6.2
>> > REST
>> > Level High Level client API was improved to be closer to original APIs,
>> > if
>> > we want to support 5.6 with it we might have to rely on APIs they
>> already
>> > improved since then. Not dramatic. But does it worth it knowing this
>> > would
>> > just be giving us 5.6 not 5.2,3,4 and 5?
>> >
>> > Now on moving forward I read:
>> >
>> > I'm definitely a +1 to try to move this forward with a proper fix.
>> >
>> >
>> > and
>> >
>> > Working around that would require introducing a new base module
>> >
>> > specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't
>> a
>> > nice way to go.
>> >
>> > So if I read you correctly you are ok moving with a proper fix but it
>> > should not introduce a new (REST based) base module? Then to be honest
>> > I'm
>> > not sure how to proceed :) Any more specific feedback on the direction
>> to
>> > follow would be great!
>> >
>> > Thanks,
>> > --
>> > Christophe
>> >
>> > On Sun, May 13, 2018 at 5:39 AM, Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org>
>> >
>> > wrote:
>> >
>> > Hi Christophe,
>> >
>> > Thanks for bringing this up.
>> >
>> > Yes, the main issue with the existing PRs and preventing it from moving
>> > forward is how it currently breaks initial assumptions of APIs in the
>> > `elasticsearch-base` module.
>> > Working around that would require introducing a new base module
>> > specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't
>> a
>> > nice way to go.
>> >
>> > I had a quick stab at the REST API, and it seems to be promising,
>> > especially given that you mentioned that starting from next versions,
>> the
>> > current API we use will be fully removed.
>> > I'm definitely a +1 to try to move this forward with a proper fix.
>> >
>> > Some other remarks / questions I have:
>> > - Maybe we can consider removing support for ES 1.x and 2.x starting
>> from
>> > 1.6. Those are very old ES versions (considering that ES 6.x has already
>> > been out for a while). Do you think this would simply how our base
>> module
>> > APIs are designed?
>> > - Wouldn't it be possible to have a REST implementation of the
>> > `ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If
>> > so,
>> > once we remove ES 1.x and 2.x, it might actually be possible to
>> > completely
>> > replace the current `elasticsearch-base` module.
>> >
>> > Cheers,
>> > Gordon
>> >
>> >
>> > On Sun, May 13, 2018 at 12:36 AM, Flavio Pompermaier <
>> pomperma...@okkam.it
>> >
>> >
>> >
>> > wrote:
>> >
>> > +1. Torally agree
>> >
>> > On Sat, 12 May 2018, 18:14 Christophe Jolif, <cjo...@gmail.com> wrote:
>> >
>> > Hi all,
>> >
>> > There is quite some time Flink Elasticsearch sink is broken for
>> > Elastisearch 5.x (nearly a year):
>> >
>> > https://issues.apache.org/jira/browse/FLINK-7386
>> >
>&g

Re: Elasticsearch Sink

2018-05-16 Thread Tzu-Li (Gordon) Tai
e a new (REST based) base module? Then to be honest
> I'm
> not sure how to proceed :) Any more specific feedback on the direction to
> follow would be great!
>
> Thanks,
> --
> Christophe
>
> On Sun, May 13, 2018 at 5:39 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>
> wrote:
>
> Hi Christophe,
>
> Thanks for bringing this up.
>
> Yes, the main issue with the existing PRs and preventing it from moving
> forward is how it currently breaks initial assumptions of APIs in the
> `elasticsearch-base` module.
> Working around that would require introducing a new base module
> specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a
> nice way to go.
>
> I had a quick stab at the REST API, and it seems to be promising,
> especially given that you mentioned that starting from next versions, the
> current API we use will be fully removed.
> I'm definitely a +1 to try to move this forward with a proper fix.
>
> Some other remarks / questions I have:
> - Maybe we can consider removing support for ES 1.x and 2.x starting from
> 1.6. Those are very old ES versions (considering that ES 6.x has already
> been out for a while). Do you think this would simply how our base module
> APIs are designed?
> - Wouldn't it be possible to have a REST implementation of the
> `ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If
> so,
> once we remove ES 1.x and 2.x, it might actually be possible to
> completely
> replace the current `elasticsearch-base` module.
>
> Cheers,
> Gordon
>
>
> On Sun, May 13, 2018 at 12:36 AM, Flavio Pompermaier <pomperma...@okkam.it
>
>
>
> wrote:
>
> +1. Torally agree
>
> On Sat, 12 May 2018, 18:14 Christophe Jolif, <cjo...@gmail.com> wrote:
>
> Hi all,
>
> There is quite some time Flink Elasticsearch sink is broken for
> Elastisearch 5.x (nearly a year):
>
> https://issues.apache.org/jira/browse/FLINK-7386
>
> And there is no support for Elasticsearch 6.x:
>
> https://issues.apache.org/jira/browse/FLINK-8101
>
> However several PRs were issued:
>
> https://github.com/apache/flink/pull/4675
> https://github.com/apache/flink/pull/5374
>
> I also raised the issue on the mailing list in the 1.5 timeframe:
>
> http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905
>
> But things are still not really moving. However this seems something
>
> people
>
> are looking for, so I would really like the community to consider that
>
> for
>
> 1.6.
>
> The problems I see from comments on the PRs:
>
> - getting something that is following the Flink APIs initially created
>
> is a
>
> nightmare because Elastic is pretty good at breaking compatibility the
>
> hard
>
> way (see in particular in the last PR the cast that have to be made to
>
> get
>
> an API that works in all cases)
> - Elasticsearch is moving away from their "native" API Flink is using
>
> to
>
> the REST APIs so there is little common ground between pre 6 and post
>
> 6
>
> even if Elasticsearch tried to get some level of compatibility in the
>
> APIs.
>
>
> My fear is that by trying to kill two birds with one stone, we actually
>
> get
>
> nothing done.
>
> In the hope of moving that forward I would like to propose for 1.6 a
>
> new
>
> Elasticsearch 6.x+ sink that would follow the design of the previous
>
> ones
>
> BUT only leverage the new REST API and not inherit from existing
>
> classes.
>
> It would really be close to what is in my previous PR:
> https://github.com/apache/flink/pull/5374 but just focus on E6+/REST
>
> and
>
> so
> avoid any "strange" cast.
>
> This would not fill the gap of the 5.2+ not working but at least we
>
> would
>
> be back on track with something for the future as REST API is where
>
> Elastic
>
> is going.
>
> If people feel there is actual interest and chances this can be merged
>
> I'll
>
> be working on issuing a new PR around that.
>
> Alternative is to get back working on the existing PR but it seems to
>
> be
>
> a
>
> dead-end at the moment and not necessarily the best option long term as
> anyway Elasticsearch is looking into promoting the REST API.
>
> Please let me know what you think?
>
> --
> Christophe
>




Re: Elasticsearch Sink

2018-05-16 Thread Christophe Jolif
gt; already
> > improved since then. Not dramatic. But does it worth it knowing this
> > would
> > just be giving us 5.6 not 5.2,3,4 and 5?
> >
> > Now on moving forward I read:
> >
> > I'm definitely a +1 to try to move this forward with a proper fix.
> >
> >
> > and
> >
> > Working around that would require introducing a new base module
> >
> > specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't
> a
> > nice way to go.
> >
> > So if I read you correctly you are ok moving with a proper fix but it
> > should not introduce a new (REST based) base module? Then to be honest
> > I'm
> > not sure how to proceed :) Any more specific feedback on the direction
> to
> > follow would be great!
> >
> > Thanks,
> > --
> > Christophe
> >
> > On Sun, May 13, 2018 at 5:39 AM, Tzu-Li (Gordon) Tai <
> tzuli...@apache.org>
> >
> > wrote:
> >
> > Hi Christophe,
> >
> > Thanks for bringing this up.
> >
> > Yes, the main issue with the existing PRs and preventing it from moving
> > forward is how it currently breaks initial assumptions of APIs in the
> > `elasticsearch-base` module.
> > Working around that would require introducing a new base module
> > specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't
> a
> > nice way to go.
> >
> > I had a quick stab at the REST API, and it seems to be promising,
> > especially given that you mentioned that starting from next versions,
> the
> > current API we use will be fully removed.
> > I'm definitely a +1 to try to move this forward with a proper fix.
> >
> > Some other remarks / questions I have:
> > - Maybe we can consider removing support for ES 1.x and 2.x starting
> from
> > 1.6. Those are very old ES versions (considering that ES 6.x has already
> > been out for a while). Do you think this would simply how our base
> module
> > APIs are designed?
> > - Wouldn't it be possible to have a REST implementation of the
> > `ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If
> > so,
> > once we remove ES 1.x and 2.x, it might actually be possible to
> > completely
> > replace the current `elasticsearch-base` module.
> >
> > Cheers,
> > Gordon
> >
> >
> > On Sun, May 13, 2018 at 12:36 AM, Flavio Pompermaier <
> pomperma...@okkam.it
> >
> >
> >
> > wrote:
> >
> > +1. Torally agree
> >
> > On Sat, 12 May 2018, 18:14 Christophe Jolif, <cjo...@gmail.com> wrote:
> >
> > Hi all,
> >
> > There is quite some time Flink Elasticsearch sink is broken for
> > Elastisearch 5.x (nearly a year):
> >
> > https://issues.apache.org/jira/browse/FLINK-7386
> >
> > And there is no support for Elasticsearch 6.x:
> >
> > https://issues.apache.org/jira/browse/FLINK-8101
> >
> > However several PRs were issued:
> >
> > https://github.com/apache/flink/pull/4675
> > https://github.com/apache/flink/pull/5374
> >
> > I also raised the issue on the mailing list in the 1.5 timeframe:
> >
> > http://apache-flink-mailing-list-archive.1008284.n3.
> > nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905
> >
> > But things are still not really moving. However this seems something
> >
> > people
> >
> > are looking for, so I would really like the community to consider that
> >
> > for
> >
> > 1.6.
> >
> > The problems I see from comments on the PRs:
> >
> > - getting something that is following the Flink APIs initially created
> >
> > is a
> >
> > nightmare because Elastic is pretty good at breaking compatibility the
> >
> > hard
> >
> > way (see in particular in the last PR the cast that have to be made to
> >
> > get
> >
> > an API that works in all cases)
> > - Elasticsearch is moving away from their "native" API Flink is using
> >
> > to
> >
> > the REST APIs so there is little common ground between pre 6 and post
> >
> > 6
> >
> > even if Elasticsearch tried to get some level of compatibility in the
> >
> > APIs.
> >
> >
> > My fear is that by trying to kill two birds with one stone, we actually
> >
> > get
> >
> > nothing done.
> >
> > In the hope of moving that forward I would like to propose for 1.6 a
> >
> > new
> >
> > Elasticsearch 6.x+ sink that would follow the design of the previous
> >
> > ones
> >
> > BUT only leverage the new REST API and not inherit from existing
> >
> > classes.
> >
> > It would really be close to what is in my previous PR:
> > https://github.com/apache/flink/pull/5374 but just focus on E6+/REST
> >
> > and
> >
> > so
> > avoid any "strange" cast.
> >
> > This would not fill the gap of the 5.2+ not working but at least we
> >
> > would
> >
> > be back on track with something for the future as REST API is where
> >
> > Elastic
> >
> > is going.
> >
> > If people feel there is actual interest and chances this can be merged
> >
> > I'll
> >
> > be working on issuing a new PR around that.
> >
> > Alternative is to get back working on the existing PR but it seems to
> >
> > be
> >
> > a
> >
> > dead-end at the moment and not necessarily the best option long term as
> > anyway Elasticsearch is looking into promoting the REST API.
> >
> > Please let me know what you think?
> >
> > --
> > Christophe
> >
>
>
>


Re: Elasticsearch Sink

2018-05-15 Thread Tzu-Li (Gordon) Tai
nitely a +1 to try to move this forward with a proper fix.  
>  
> Some other remarks / questions I have:  
> - Maybe we can consider removing support for ES 1.x and 2.x starting from  
> 1.6. Those are very old ES versions (considering that ES 6.x has already  
> been out for a while). Do you think this would simply how our base module  
> APIs are designed?  
> - Wouldn't it be possible to have a REST implementation of the  
> `ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If  
> so,  
> once we remove ES 1.x and 2.x, it might actually be possible to  
> completely  
> replace the current `elasticsearch-base` module.  
>  
> Cheers,  
> Gordon  
>  
>  
> On Sun, May 13, 2018 at 12:36 AM, Flavio Pompermaier <pomperma...@okkam.it  
>  
>  
>  
> wrote:  
>  
> +1. Torally agree  
>  
> On Sat, 12 May 2018, 18:14 Christophe Jolif, <cjo...@gmail.com> wrote:  
>  
> Hi all,  
>  
> There is quite some time Flink Elasticsearch sink is broken for  
> Elastisearch 5.x (nearly a year):  
>  
> https://issues.apache.org/jira/browse/FLINK-7386  
>  
> And there is no support for Elasticsearch 6.x:  
>  
> https://issues.apache.org/jira/browse/FLINK-8101  
>  
> However several PRs were issued:  
>  
> https://github.com/apache/flink/pull/4675  
> https://github.com/apache/flink/pull/5374  
>  
> I also raised the issue on the mailing list in the 1.5 timeframe:  
>  
> http://apache-flink-mailing-list-archive.1008284.n3.  
> nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905  
>  
> But things are still not really moving. However this seems something  
>  
> people  
>  
> are looking for, so I would really like the community to consider that  
>  
> for  
>  
> 1.6.  
>  
> The problems I see from comments on the PRs:  
>  
> - getting something that is following the Flink APIs initially created  
>  
> is a  
>  
> nightmare because Elastic is pretty good at breaking compatibility the  
>  
> hard  
>  
> way (see in particular in the last PR the cast that have to be made to  
>  
> get  
>  
> an API that works in all cases)  
> - Elasticsearch is moving away from their "native" API Flink is using  
>  
> to  
>  
> the REST APIs so there is little common ground between pre 6 and post  
>  
> 6  
>  
> even if Elasticsearch tried to get some level of compatibility in the  
>  
> APIs.  
>  
>  
> My fear is that by trying to kill two birds with one stone, we actually  
>  
> get  
>  
> nothing done.  
>  
> In the hope of moving that forward I would like to propose for 1.6 a  
>  
> new  
>  
> Elasticsearch 6.x+ sink that would follow the design of the previous  
>  
> ones  
>  
> BUT only leverage the new REST API and not inherit from existing  
>  
> classes.  
>  
> It would really be close to what is in my previous PR:  
> https://github.com/apache/flink/pull/5374 but just focus on E6+/REST  
>  
> and  
>  
> so  
> avoid any "strange" cast.  
>  
> This would not fill the gap of the 5.2+ not working but at least we  
>  
> would  
>  
> be back on track with something for the future as REST API is where  
>  
> Elastic  
>  
> is going.  
>  
> If people feel there is actual interest and chances this can be merged  
>  
> I'll  
>  
> be working on issuing a new PR around that.  
>  
> Alternative is to get back working on the existing PR but it seems to  
>  
> be  
>  
> a  
>  
> dead-end at the moment and not necessarily the best option long term as  
> anyway Elasticsearch is looking into promoting the REST API.  
>  
> Please let me know what you think?  
>  
> --  
> Christophe  
>  



--  
Christophe  


Re: Elasticsearch Sink

2018-05-15 Thread Christophe Jolif
 with it.
>
> If all the necessary APIs are already here (to be double checked) it
> should
> be able cover 5.6. What I noticed when working on the PRs is that 6.2
> REST
> Level High Level client API was improved to be closer to original APIs,
> if
> we want to support 5.6 with it we might have to rely on APIs they already
> improved since then. Not dramatic. But does it worth it knowing this
> would
> just be giving us 5.6 not 5.2,3,4 and 5?
>
> Now on moving forward I read:
>
> I'm definitely a +1 to try to move this forward with a proper fix.
>
>
> and
>
> Working around that would require introducing a new base module
>
> specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a
> nice way to go.
>
> So if I read you correctly you are ok moving with a proper fix but it
> should not introduce a new (REST based) base module? Then to be honest
> I'm
> not sure how to proceed :) Any more specific feedback on the direction to
> follow would be great!
>
> Thanks,
> --
> Christophe
>
> On Sun, May 13, 2018 at 5:39 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>
> wrote:
>
> Hi Christophe,
>
> Thanks for bringing this up.
>
> Yes, the main issue with the existing PRs and preventing it from moving
> forward is how it currently breaks initial assumptions of APIs in the
> `elasticsearch-base` module.
> Working around that would require introducing a new base module
> specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a
> nice way to go.
>
> I had a quick stab at the REST API, and it seems to be promising,
> especially given that you mentioned that starting from next versions, the
> current API we use will be fully removed.
> I'm definitely a +1 to try to move this forward with a proper fix.
>
> Some other remarks / questions I have:
> - Maybe we can consider removing support for ES 1.x and 2.x starting from
> 1.6. Those are very old ES versions (considering that ES 6.x has already
> been out for a while). Do you think this would simply how our base module
> APIs are designed?
> - Wouldn't it be possible to have a REST implementation of the
> `ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If
> so,
> once we remove ES 1.x and 2.x, it might actually be possible to
> completely
> replace the current `elasticsearch-base` module.
>
> Cheers,
> Gordon
>
>
> On Sun, May 13, 2018 at 12:36 AM, Flavio Pompermaier <pomperma...@okkam.it
>
>
>
> wrote:
>
> +1. Torally agree
>
> On Sat, 12 May 2018, 18:14 Christophe Jolif, <cjo...@gmail.com> wrote:
>
> Hi all,
>
> There is quite some time Flink Elasticsearch sink is broken for
> Elastisearch 5.x (nearly a year):
>
> https://issues.apache.org/jira/browse/FLINK-7386
>
> And there is no support for Elasticsearch 6.x:
>
> https://issues.apache.org/jira/browse/FLINK-8101
>
> However several PRs were issued:
>
> https://github.com/apache/flink/pull/4675
> https://github.com/apache/flink/pull/5374
>
> I also raised the issue on the mailing list in the 1.5 timeframe:
>
> http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905
>
> But things are still not really moving. However this seems something
>
> people
>
> are looking for, so I would really like the community to consider that
>
> for
>
> 1.6.
>
> The problems I see from comments on the PRs:
>
> - getting something that is following the Flink APIs initially created
>
> is a
>
> nightmare because Elastic is pretty good at breaking compatibility the
>
> hard
>
> way (see in particular in the last PR the cast that have to be made to
>
> get
>
> an API that works in all cases)
> - Elasticsearch is moving away from their "native" API Flink is using
>
> to
>
> the REST APIs so there is little common ground between pre 6 and post
>
> 6
>
> even if Elasticsearch tried to get some level of compatibility in the
>
> APIs.
>
>
> My fear is that by trying to kill two birds with one stone, we actually
>
> get
>
> nothing done.
>
> In the hope of moving that forward I would like to propose for 1.6 a
>
> new
>
> Elasticsearch 6.x+ sink that would follow the design of the previous
>
> ones
>
> BUT only leverage the new REST API and not inherit from existing
>
> classes.
>
> It would really be close to what is in my previous PR:
> https://github.com/apache/flink/pull/5374 but just focus on E6+/REST
>
> and
>
> so
> avoid any "strange" cast.
>
> This would not fill the gap of the 5.2+ not working but at least we
>
> would
>
> be back on track with something for the future as REST API is where
>
> Elastic
>
> is going.
>
> If people feel there is actual interest and chances this can be merged
>
> I'll
>
> be working on issuing a new PR around that.
>
> Alternative is to get back working on the existing PR but it seems to
>
> be
>
> a
>
> dead-end at the moment and not necessarily the best option long term as
> anyway Elasticsearch is looking into promoting the REST API.
>
> Please let me know what you think?
>
> --
> Christophe
>



-- 
Christophe


Re: Elasticsearch Sink

2018-05-14 Thread Tzu-Li (Gordon) Tai
moving  
forward is how it currently breaks initial assumptions of APIs in the  
`elasticsearch-base` module.  
Working around that would require introducing a new base module  
specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a  
nice way to go.  

I had a quick stab at the REST API, and it seems to be promising,  
especially given that you mentioned that starting from next versions, the  
current API we use will be fully removed.  
I'm definitely a +1 to try to move this forward with a proper fix.  

Some other remarks / questions I have:  
- Maybe we can consider removing support for ES 1.x and 2.x starting from  
1.6. Those are very old ES versions (considering that ES 6.x has already  
been out for a while). Do you think this would simply how our base module  
APIs are designed?  
- Wouldn't it be possible to have a REST implementation of the  
`ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If so,  
once we remove ES 1.x and 2.x, it might actually be possible to completely  
replace the current `elasticsearch-base` module.  

Cheers,  
Gordon  


On Sun, May 13, 2018 at 12:36 AM, Flavio Pompermaier <pomperma...@okkam.it  


wrote:  

+1. Torally agree  

On Sat, 12 May 2018, 18:14 Christophe Jolif, <cjo...@gmail.com> wrote:  

Hi all,  

There is quite some time Flink Elasticsearch sink is broken for  
Elastisearch 5.x (nearly a year):  

https://issues.apache.org/jira/browse/FLINK-7386  

And there is no support for Elasticsearch 6.x:  

https://issues.apache.org/jira/browse/FLINK-8101  

However several PRs were issued:  

https://github.com/apache/flink/pull/4675  
https://github.com/apache/flink/pull/5374  

I also raised the issue on the mailing list in the 1.5 timeframe:  

http://apache-flink-mailing-list-archive.1008284.n3.  
nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905  

But things are still not really moving. However this seems something  

people  

are looking for, so I would really like the community to consider that  

for  

1.6.  

The problems I see from comments on the PRs:  

- getting something that is following the Flink APIs initially created  

is a  

nightmare because Elastic is pretty good at breaking compatibility the  

hard  

way (see in particular in the last PR the cast that have to be made to  

get  

an API that works in all cases)  
- Elasticsearch is moving away from their "native" API Flink is using  

to  

the REST APIs so there is little common ground between pre 6 and post  

6  

even if Elasticsearch tried to get some level of compatibility in the  

APIs.  


My fear is that by trying to kill two birds with one stone, we actually  

get  

nothing done.  

In the hope of moving that forward I would like to propose for 1.6 a  

new  

Elasticsearch 6.x+ sink that would follow the design of the previous  

ones  

BUT only leverage the new REST API and not inherit from existing  

classes.  

It would really be close to what is in my previous PR:  
https://github.com/apache/flink/pull/5374 but just focus on E6+/REST  

and  

so  
avoid any "strange" cast.  

This would not fill the gap of the 5.2+ not working but at least we  

would  

be back on track with something for the future as REST API is where  

Elastic  

is going.  

If people feel there is actual interest and chances this can be merged  

I'll  

be working on issuing a new PR around that.  

Alternative is to get back working on the existing PR but it seems to  

be  

a  

dead-end at the moment and not necessarily the best option long term as  
anyway Elasticsearch is looking into promoting the REST API.  

Please let me know what you think?  

--  
Christophe  


Re: Elasticsearch Sink

2018-05-14 Thread Steve Blackmon
It seems to me that if the transport client dependency is removed, the same
module could perform inserts, updates, and deletes via the http bulk API,
and whatever version differences exist with that API could be handled
inside the module without any difference to the classpath of the pipeline.

If that's true there's no need or benefit to deprecating support for
earlier elastic version so long as someone is willing to implement test and
maintain them.

Steve

On 5/13/18 at 2:00 PM, Christophe wrote:

Hi Gordon,

Thanks for your feedback (and Flavio for your support!)

About your remarks/questions:

- Maybe we can consider removing support for ES 1.x and 2.x starting from

1.6. Those are very old ES versions (considering that ES 6.x has already
been out for a while). Do you think this would simply how our base module
APIs are designed?

I would tend to say it should not change drastically the picture but would
have to look into it.

- Wouldn't it be possible to have a REST implementation of the

`ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If so,
once we remove ES 1.x and 2.x, it might actually be possible to completely
replace the current `elasticsearch-base` module.

The High level REST API was introduced in Elasticsearch 5.6 so it is not
possible to cover 5.5 and below with it.

If all the necessary APIs are already here (to be double checked) it should
be able cover 5.6. What I noticed when working on the PRs is that 6.2 REST
Level High Level client API was improved to be closer to original APIs, if
we want to support 5.6 with it we might have to rely on APIs they already
improved since then. Not dramatic. But does it worth it knowing this would
just be giving us 5.6 not 5.2,3,4 and 5?

Now on moving forward I read:

I'm definitely a +1 to try to move this forward with a proper fix.


and

Working around that would require introducing a new base module

specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a
nice way to go.

So if I read you correctly you are ok moving with a proper fix but it
should not introduce a new (REST based) base module? Then to be honest I'm
not sure how to proceed :) Any more specific feedback on the direction to
follow would be great!

Thanks,
--
Christophe

On Sun, May 13, 2018 at 5:39 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

Hi Christophe,

Thanks for bringing this up.

Yes, the main issue with the existing PRs and preventing it from moving
forward is how it currently breaks initial assumptions of APIs in the
`elasticsearch-base` module.
Working around that would require introducing a new base module
specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a
nice way to go.

I had a quick stab at the REST API, and it seems to be promising,
especially given that you mentioned that starting from next versions, the
current API we use will be fully removed.
I'm definitely a +1 to try to move this forward with a proper fix.

Some other remarks / questions I have:
- Maybe we can consider removing support for ES 1.x and 2.x starting from
1.6. Those are very old ES versions (considering that ES 6.x has already
been out for a while). Do you think this would simply how our base module
APIs are designed?
- Wouldn't it be possible to have a REST implementation of the
`ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If so,
once we remove ES 1.x and 2.x, it might actually be possible to completely
replace the current `elasticsearch-base` module.

Cheers,
Gordon


On Sun, May 13, 2018 at 12:36 AM, Flavio Pompermaier <pomperma...@okkam.it


wrote:

+1. Torally agree

On Sat, 12 May 2018, 18:14 Christophe Jolif, <cjo...@gmail.com> wrote:

Hi all,

There is quite some time Flink Elasticsearch sink is broken for
Elastisearch 5.x (nearly a year):

https://issues.apache.org/jira/browse/FLINK-7386

And there is no support for Elasticsearch 6.x:

https://issues.apache.org/jira/browse/FLINK-8101

However several PRs were issued:

https://github.com/apache/flink/pull/4675
https://github.com/apache/flink/pull/5374

I also raised the issue on the mailing list in the 1.5 timeframe:

http://apache-flink-mailing-list-archive.1008284.n3.
nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905

But things are still not really moving. However this seems something

people

are looking for, so I would really like the community to consider that

for

1.6.

The problems I see from comments on the PRs:

- getting something that is following the Flink APIs initially created

is a

nightmare because Elastic is pretty good at breaking compatibility the

hard

way (see in particular in the last PR the cast that have to be made to

get

an API that works in all cases)
- Elasticsearch is moving away from their "native" API Flink is using

to

the REST APIs so there is little common ground between pre 6 and post

6

even if Elasticsearch tried to get some level of compatibility in the

APIs.



Re: Elasticsearch Sink

2018-05-13 Thread Christophe Jolif
Hi Gordon,

Thanks for your feedback (and Flavio for your support!)

About your remarks/questions:

> - Maybe we can consider removing support for ES 1.x and 2.x starting from
1.6. Those are very old ES versions (considering that ES 6.x has already
been out for a while). Do you think this would simply how our base module
APIs are designed?

I would tend to say it should not change drastically the picture but would
have to look into it.

> - Wouldn't it be possible to have a REST implementation of the
`ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If so,
once we remove ES 1.x and 2.x, it might actually be possible to completely
replace the current `elasticsearch-base` module.

The High level REST API was introduced in Elasticsearch 5.6 so it is not
possible to cover 5.5 and below with it.

If all the necessary APIs are already here (to be double checked) it should
be able cover 5.6. What I noticed when working on the PRs is that 6.2 REST
Level High Level client API was improved to be closer to original APIs, if
we want to support 5.6 with it we might have to rely on APIs they already
improved since then. Not dramatic. But does it worth it knowing this would
just be giving us 5.6 not 5.2,3,4 and 5?

Now on moving forward I read:

> I'm definitely a +1 to try to move this forward with a proper fix.

and

> Working around that would require introducing a new base module
specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a
nice way to go.

So if I read you correctly you are ok moving with a proper fix but it
should not introduce a new (REST based) base module? Then to be honest I'm
not sure how to proceed :) Any more specific feedback on the direction to
follow would be great!

Thanks,
--
Christophe

On Sun, May 13, 2018 at 5:39 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Christophe,
>
> Thanks for bringing this up.
>
> Yes, the main issue with the existing PRs and preventing it from moving
> forward is how it currently breaks initial assumptions of APIs in the
> `elasticsearch-base` module.
> Working around that would require introducing a new base module
> specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a
> nice way to go.
>
> I had a quick stab at the REST API, and it seems to be promising,
> especially given that you mentioned that starting from next versions, the
> current API we use will be fully removed.
> I'm definitely a +1 to try to move this forward with a proper fix.
>
> Some other remarks / questions I have:
> - Maybe we can consider removing support for ES 1.x and 2.x starting from
> 1.6. Those are very old ES versions (considering that ES 6.x has already
> been out for a while). Do you think this would simply how our base module
> APIs are designed?
> - Wouldn't it be possible to have a REST implementation of the
> `ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If so,
> once we remove ES 1.x and 2.x, it might actually be possible to completely
> replace the current `elasticsearch-base` module.
>
> Cheers,
> Gordon
>
>
> On Sun, May 13, 2018 at 12:36 AM, Flavio Pompermaier <pomperma...@okkam.it
> >
> wrote:
>
> > +1. Torally agree
> >
> > On Sat, 12 May 2018, 18:14 Christophe Jolif, <cjo...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > There is quite some time Flink Elasticsearch sink is broken for
> > > Elastisearch 5.x  (nearly a year):
> > >
> > > https://issues.apache.org/jira/browse/FLINK-7386
> > >
> > > And there is no support for Elasticsearch 6.x:
> > >
> > > https://issues.apache.org/jira/browse/FLINK-8101
> > >
> > > However several PRs were issued:
> > >
> > > https://github.com/apache/flink/pull/4675
> > > https://github.com/apache/flink/pull/5374
> > >
> > > I also raised the issue on the mailing list in the 1.5 timeframe:
> > >
> > > http://apache-flink-mailing-list-archive.1008284.n3.
> > > nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905
> > >
> > > But things are still not really moving. However this seems something
> > people
> > > are looking for, so I would really like the community to consider that
> > for
> > > 1.6.
> > >
> > > The problems I see from comments on the PRs:
> > >
> > > - getting something that is following the Flink APIs initially created
> > is a
> > > nightmare because Elastic is pretty good at breaking compatibility the
> > hard
> > > way (see in particular in the last PR the cast that have to be made to
> > get
> > > an API that works in all cases)
> > > - Elasticsearc

Re: Elasticsearch Sink

2018-05-12 Thread Tzu-Li (Gordon) Tai
Hi Christophe,

Thanks for bringing this up.

Yes, the main issue with the existing PRs and preventing it from moving
forward is how it currently breaks initial assumptions of APIs in the
`elasticsearch-base` module.
Working around that would require introducing a new base module
specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a
nice way to go.

I had a quick stab at the REST API, and it seems to be promising,
especially given that you mentioned that starting from next versions, the
current API we use will be fully removed.
I'm definitely a +1 to try to move this forward with a proper fix.

Some other remarks / questions I have:
- Maybe we can consider removing support for ES 1.x and 2.x starting from
1.6. Those are very old ES versions (considering that ES 6.x has already
been out for a while). Do you think this would simply how our base module
APIs are designed?
- Wouldn't it be possible to have a REST implementation of the
`ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If so,
once we remove ES 1.x and 2.x, it might actually be possible to completely
replace the current `elasticsearch-base` module.

Cheers,
Gordon


On Sun, May 13, 2018 at 12:36 AM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> +1. Torally agree
>
> On Sat, 12 May 2018, 18:14 Christophe Jolif, <cjo...@gmail.com> wrote:
>
> > Hi all,
> >
> > There is quite some time Flink Elasticsearch sink is broken for
> > Elastisearch 5.x  (nearly a year):
> >
> > https://issues.apache.org/jira/browse/FLINK-7386
> >
> > And there is no support for Elasticsearch 6.x:
> >
> > https://issues.apache.org/jira/browse/FLINK-8101
> >
> > However several PRs were issued:
> >
> > https://github.com/apache/flink/pull/4675
> > https://github.com/apache/flink/pull/5374
> >
> > I also raised the issue on the mailing list in the 1.5 timeframe:
> >
> > http://apache-flink-mailing-list-archive.1008284.n3.
> > nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905
> >
> > But things are still not really moving. However this seems something
> people
> > are looking for, so I would really like the community to consider that
> for
> > 1.6.
> >
> > The problems I see from comments on the PRs:
> >
> > - getting something that is following the Flink APIs initially created
> is a
> > nightmare because Elastic is pretty good at breaking compatibility the
> hard
> > way (see in particular in the last PR the cast that have to be made to
> get
> > an API that works in all cases)
> > - Elasticsearch is moving away from their "native" API Flink is using to
> > the REST APIs so there is little  common ground between pre 6 and post 6
> > even if Elasticsearch tried to get some level of compatibility in the
> APIs.
> >
> > My fear is that by trying to kill two birds with one stone, we actually
> get
> > nothing done.
> >
> > In the hope of moving that forward I would like to propose for 1.6 a new
> > Elasticsearch 6.x+ sink that would follow the design of the previous ones
> > BUT only leverage the new REST API and not inherit from existing classes.
> > It would really be close to what is in my previous PR:
> > https://github.com/apache/flink/pull/5374 but just focus on E6+/REST and
> > so
> > avoid any "strange" cast.
> >
> > This would not fill the gap of the 5.2+ not working but at least we would
> > be back on track with something for the future as REST API is where
> Elastic
> > is going.
> >
> > If people feel there is actual interest and chances this can be merged
> I'll
> > be working on issuing a new PR around that.
> >
> > Alternative is to get back working on the existing PR but it seems to be
> a
> > dead-end at the moment and not necessarily the best option long term as
> > anyway Elasticsearch is looking into promoting the REST API.
> >
> > Please let me know what you think?
> >
> > --
> > Christophe
> >
>


Re: Elasticsearch Sink

2018-05-12 Thread Flavio Pompermaier
+1. Torally agree

On Sat, 12 May 2018, 18:14 Christophe Jolif, <cjo...@gmail.com> wrote:

> Hi all,
>
> There is quite some time Flink Elasticsearch sink is broken for
> Elastisearch 5.x  (nearly a year):
>
> https://issues.apache.org/jira/browse/FLINK-7386
>
> And there is no support for Elasticsearch 6.x:
>
> https://issues.apache.org/jira/browse/FLINK-8101
>
> However several PRs were issued:
>
> https://github.com/apache/flink/pull/4675
> https://github.com/apache/flink/pull/5374
>
> I also raised the issue on the mailing list in the 1.5 timeframe:
>
> http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905
>
> But things are still not really moving. However this seems something people
> are looking for, so I would really like the community to consider that for
> 1.6.
>
> The problems I see from comments on the PRs:
>
> - getting something that is following the Flink APIs initially created is a
> nightmare because Elastic is pretty good at breaking compatibility the hard
> way (see in particular in the last PR the cast that have to be made to get
> an API that works in all cases)
> - Elasticsearch is moving away from their "native" API Flink is using to
> the REST APIs so there is little  common ground between pre 6 and post 6
> even if Elasticsearch tried to get some level of compatibility in the APIs.
>
> My fear is that by trying to kill two birds with one stone, we actually get
> nothing done.
>
> In the hope of moving that forward I would like to propose for 1.6 a new
> Elasticsearch 6.x+ sink that would follow the design of the previous ones
> BUT only leverage the new REST API and not inherit from existing classes.
> It would really be close to what is in my previous PR:
> https://github.com/apache/flink/pull/5374 but just focus on E6+/REST and
> so
> avoid any "strange" cast.
>
> This would not fill the gap of the 5.2+ not working but at least we would
> be back on track with something for the future as REST API is where Elastic
> is going.
>
> If people feel there is actual interest and chances this can be merged I'll
> be working on issuing a new PR around that.
>
> Alternative is to get back working on the existing PR but it seems to be a
> dead-end at the moment and not necessarily the best option long term as
> anyway Elasticsearch is looking into promoting the REST API.
>
> Please let me know what you think?
>
> --
> Christophe
>


Elasticsearch Sink

2018-05-12 Thread Christophe Jolif
Hi all,

There is quite some time Flink Elasticsearch sink is broken for
Elastisearch 5.x  (nearly a year):

https://issues.apache.org/jira/browse/FLINK-7386

And there is no support for Elasticsearch 6.x:

https://issues.apache.org/jira/browse/FLINK-8101

However several PRs were issued:

https://github.com/apache/flink/pull/4675
https://github.com/apache/flink/pull/5374

I also raised the issue on the mailing list in the 1.5 timeframe:

http://apache-flink-mailing-list-archive.1008284.n3.
nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905

But things are still not really moving. However this seems something people
are looking for, so I would really like the community to consider that for
1.6.

The problems I see from comments on the PRs:

- getting something that is following the Flink APIs initially created is a
nightmare because Elastic is pretty good at breaking compatibility the hard
way (see in particular in the last PR the cast that have to be made to get
an API that works in all cases)
- Elasticsearch is moving away from their "native" API Flink is using to
the REST APIs so there is little  common ground between pre 6 and post 6
even if Elasticsearch tried to get some level of compatibility in the APIs.

My fear is that by trying to kill two birds with one stone, we actually get
nothing done.

In the hope of moving that forward I would like to propose for 1.6 a new
Elasticsearch 6.x+ sink that would follow the design of the previous ones
BUT only leverage the new REST API and not inherit from existing classes.
It would really be close to what is in my previous PR:
https://github.com/apache/flink/pull/5374 but just focus on E6+/REST and so
avoid any "strange" cast.

This would not fill the gap of the 5.2+ not working but at least we would
be back on track with something for the future as REST API is where Elastic
is going.

If people feel there is actual interest and chances this can be merged I'll
be working on issuing a new PR around that.

Alternative is to get back working on the existing PR but it seems to be a
dead-end at the moment and not necessarily the best option long term as
anyway Elasticsearch is looking into promoting the REST API.

Please let me know what you think?

-- 
Christophe


[jira] [Created] (FLINK-7697) Add metrics for Elasticsearch Sink

2017-09-27 Thread Hai Zhou UTC+8 (JIRA)
Hai Zhou UTC+8 created FLINK-7697:
-

 Summary: Add metrics for Elasticsearch Sink
 Key: FLINK-7697
 URL: https://issues.apache.org/jira/browse/FLINK-7697
 Project: Flink
  Issue Type: Wish
  Components: ElasticSearch Connector
Reporter: Hai Zhou UTC+8
Priority: Critical


We should add metrics  to track  events write to ElasticasearchSink.
eg. 
* number of successful bulk sends
* number of documents inserted
* number of documents updated
* number of documents version conflicts




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-5353) Elasticsearch Sink loses well-formed documents when there are malformed documents

2016-12-16 Thread Flavio Pompermaier (JIRA)
Flavio Pompermaier created FLINK-5353:
-

 Summary: Elasticsearch Sink loses well-formed documents when there 
are malformed documents
 Key: FLINK-5353
 URL: https://issues.apache.org/jira/browse/FLINK-5353
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats
Affects Versions: 1.1.3
Reporter: Flavio Pompermaier






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-11-21 Thread static-max (JIRA)
static-max created FLINK-5122:
-

 Summary: Elasticsearch Sink loses documents when cluster has high 
load
 Key: FLINK-5122
 URL: https://issues.apache.org/jira/browse/FLINK-5122
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.2.0
Reporter: static-max


My cluster had high load and documents got not indexed. This violates the "at 
least once" semantics in the ES connector.

I gave pressure on my cluster to test Flink, causing new indices to be created 
and balanced. On those errors the bulk should be tried again instead of being 
discarded.

Primary shard not active because ES decided to rebalance the index:
2016-11-15 15:35:16,123 ERROR 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
Failed to index document in Elasticsearch: 
UnavailableShardsException[[index-name][3] primary shard is not active Timeout: 
[1m], request: [BulkShardRequest to [index-name] containing [20] requests]]

Bulk queue on node full (I set queue to a low value to reproduce error):
22:37:57,702 ERROR 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
Failed to index document in Elasticsearch: 
RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
 nested: EsRejectedExecutionException[rejected execution of 
org.elasticsearch.transport.TransportService$4@727e677c on 
EsThreadPoolExecutor[bulk, queue capacity = 1, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, 
pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 2939]]];

I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)