Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your 
desired version in your project.

You can also check what Elasticsearch client version the project is using by 
checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon

On March 1, 2017 at 1:21:56 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging 
problem. That said I added the plugin from the link provided but I'm not sure 
what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently 
using is the flink-connector do I have to modify its code?

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1.3</version>
</dependency>

One thing I forgot to mention, I can only modify things locally packing it into 
a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running 
things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the 
uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to 
build the Flink Elasticsearch Connector. Could you try overriding this version 
to 2.4.1 when building your code and see if the problem remains?

For the second issue, please check out 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html#packaging-dependencies-with-your-usercode-with-maven.

Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a 
weird error message I that I can't decipher. Hopefully, someone here can help 
me. I'm trying to run the java example from the website.I doublechecked that I 
can reach the elastic search from the development machine by putting some data 
in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{

"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be 
buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 
19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.elasticsearch.threadpool.ThreadPool
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)

Reply via email to