Hi,
The Flink Elasticsearch Sink uses the Elasticsearch Java client to send the
indexing requests, so whatever the client supports, it will be achievable
through the `ElasticsearchSinkFunction` also.
From a quick check at the Elasticsearch Javadocs, I think you can also just set
the document json as a String in the created `IndexRequest`. So,
public IndexRequest createIndexRequest(String element){
HashMap esJson = new HashMap<>();
esJson.put("data", element);
return Requests
.indexRequest()
.index("logs")
.type("object")
.source(esJson);
}
Here, if `element` is already a Json string representing the document, you can
just do
return Requests
.indexRequest()
.index(“logs”)
.type(“object”)
.source(“the Json String”);
The `.source(…)` method has quite a few variants on how to set the source, and
providing a Map is only one of them.
Please refer to the Elasticsearch Javadocs for the full list
(https://www.javadoc.io/doc/org.elasticsearch/elasticsearch/5.2.1).
Hope this helps!
Cheers,
Gordon
On February 21, 2017 at 5:43:36 PM, Fábio Dias (fabiodio...@gmail.com) wrote:
Hi,
thanks for the reply.
There isn't other way to do that?
Using REST you can send json like this :
curl -XPOST 'localhost:9200/customer/external?pretty' -H 'Content-Type:
application/json' -d'
{
"name": "Jane Doe"
}
'
In my case I have json like this:
{
"filters" : {
"id" : 1,
"name": "abc"
}
}
how can I treat this cases? There isn't a way to send all the json element and
index it like the in the REST request?
Thanks.
Tzu-Li (Gordon) Tai escreveu no dia terça, 21/02/2017 às
07:54:
Hi,
I’ll use your code to explain.
public IndexRequest createIndexRequest(String element){
HashMap esJson = new HashMap<>();
esJson.put("data", element);
What you should do here is parse the field values from `element`, and simply
treat them as key-value pairs of the `esJson` map.
So, the `esJson` should be prepared by doing:
esJson.put(“id”, 6);
esJson.put(“name”, “A green door”);
esJson.put(“price”, 12.5);
etc.
Cheers,
Gordon
On February 21, 2017 at 12:41:40 AM, Fábio Dias (fabiodio...@gmail.com) wrote:
Hi,
I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json
object ({"id":1, "name":"X"} ect...), I already have a string with this
information, but I don't want to save it as string.
I recieve this:
{
"_index": "logs",
"_type": "object",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": "{\"id\":6,\"name\":\"A green
door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
}
}
And I want to recieve this:
{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}
my java code:
try {
ArrayList transports = new ArrayList<>();
transports.add(new InetSocketAddress("127.0.0.1", 9300));
ElasticsearchSinkFunction indexLog = new
ElasticsearchSinkFunction() {
private static final long serialVersionUID = 8802869701292023100L;
public IndexRequest createIndexRequest(String element){
HashMap esJson = new HashMap<>();
esJson.put("data", element);
return Requests
.indexRequest()
.index("logs")
.type("object")
.source(esJson);
}
@Override
public void process(String element, RuntimeContext ctx,
RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
};
ElasticsearchSink esSink = new
ElasticsearchSink(config, transports, indexLog);
input.addSink(esSink);
}
catch (Exception e) {
System.out.println(e);
}
Do I need to treat every entry as a map? Can I just send a object with key
value?
Thanks.