[ 
https://issues.apache.org/jira/browse/FLINK-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308330#comment-16308330
 ] 

Bhaskar Divya edited comment on FLINK-8334 at 1/2/18 4:37 PM:
--------------------------------------------------------------

Thanks [~aljoscha].

I think I got it working now.(no more failures on start of the Job) 
My POM was messed up trying to fix this issue. So, I re-created a fresh POM 
from the maven archetype of 1.4.0. I put the dependencies I required one by one.
Also, There were issues with the Elasticsearch docker environment.
For anybody looking at this, Please go through [this 
link|https://stackoverflow.com/questions/41192680/update-max-map-count-for-elasticsearch-docker-container-mac-host#41251595]
 to set vm.max_map_count
And, set the following environment variables in docker container as below :
http.host  = 0.0.0.0
transport.host = 0.0.0.0
xpack.security.enabled = false

Also, The POM which is working for me is 
1) Add dependency in the outer dependencies block 
{noformat}
<dependency>
    <groupId>org.apache.flink</groupId>
    
<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
{noformat}

2) Follow the guidelines from 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/linking.html].
 Add the provided <plugin> configuration. Inside the <artifactItems> I added 
the following for elasticsearch

{noformat}
<!-- Custom Added for ElasticSearch -->
<artifactItem>
    <groupId>org.apache.flink</groupId>
    
<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
    <version>1.4.0</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/apache/flink/**</includes>
</artifactItem>

<artifactItem>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>5.1.2</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>**</includes>
</artifactItem>

<artifactItem>
    <groupId>org.elasticsearch.plugin</groupId>
    <artifactId>transport-netty3-client</artifactId>
    <version>5.1.2</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>**</includes>
</artifactItem>

<artifactItem>
    <groupId>io.netty</groupId>
    <artifactId>netty</artifactId>
    <version>3.10.6.Final</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>**</includes>
</artifactItem>
<!-- /Custom Added for ElasticSearch -->
{noformat}

Few of them are definitely extras and maybe not actually required.
Hope it helps anyone trying the Elasticsearch connector with Docker.


was (Author: bhaskardivya):
Thanks [~aljoscha].

I think I got it working now.(no more failures on start of the Job) 
My POM was messed up trying to fix this issue. So, I re-created a fresh POM 
from the maven archetype of 1.4.0. I put the dependencies I required one by one.
Also, There were issues with the Elasticsearch docker environment.
For anybody looking at this, Please go through [this 
link|https://stackoverflow.com/questions/41192680/update-max-map-count-for-elasticsearch-docker-container-mac-host#41251595]
 to set vm.max_map_count
And, set the following environment variables in docker container as below :
http.host  = 0.0.0.0
transport.host = 0.0.0.0
xpack.security.enabled = false

Also, The POM which is working for me is 
1) Add dependency in the outer dependencies block 
{noformat}
        <dependency>
            <groupId>org.apache.flink</groupId>
            
<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
{noformat}

2) Follow the guidelines from 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/linking.html].
 Add the provided <plugin> configuration. Inside the <artifactItems> I added 
the following for elasticsearch

{noformat}
<!-- Custom Added for ElasticSearch -->
                                        <artifactItem>
                                            <groupId>org.apache.flink</groupId>
                                            
<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
                                            <version>1.4.0</version>
                                            <type>jar</type>
                                            <overWrite>false</overWrite>
                                            
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                                            
<includes>org/apache/flink/**</includes>
                                        </artifactItem>

                                        <artifactItem>
                                            <groupId>org.elasticsearch</groupId>
                                            
<artifactId>elasticsearch</artifactId>
                                            <version>5.1.2</version>
                                            <type>jar</type>
                                            <overWrite>false</overWrite>
                                            
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                                            <includes>**</includes>
                                        </artifactItem>

                                        <artifactItem>
                                            
<groupId>org.elasticsearch.plugin</groupId>
                                            
<artifactId>transport-netty3-client</artifactId>
                                            <version>5.1.2</version>
                                            <type>jar</type>
                                            <overWrite>false</overWrite>
                                            
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                                            <includes>**</includes>
                                        </artifactItem>

                                        <artifactItem>
                                            <groupId>io.netty</groupId>
                                            <artifactId>netty</artifactId>
                                            <version>3.10.6.Final</version>
                                            <type>jar</type>
                                            <overWrite>false</overWrite>
                                            
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                                            <includes>**</includes>
                                        </artifactItem>
                                        <!-- /Custom Added for ElasticSearch -->
{noformat}

Few of them are definitely extras and maybe not actually required.
Hope it helps anyone trying the Elasticsearch connector with Docker.

> Elasticsearch Connector throwing java.lang.ClassNotFoundException: 
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer
> ----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8334
>                 URL: https://issues.apache.org/jira/browse/FLINK-8334
>             Project: Flink
>          Issue Type: Bug
>          Components: ElasticSearch Connector
>    Affects Versions: 1.4.0
>         Environment: Using Elasticsearch 5.1.2 in a docker environment
> Flink is deployed on a different docker
>            Reporter: Bhaskar Divya
>              Labels: elasticsearch, netty
>
> I have a Elasticsearch sink configured. When a job is submitted, It goes into 
> fail status in a few seconds. 
> Following is the Exception from the Job screen:
> {code:java}
> java.lang.RuntimeException: Elasticsearch client is not connected to any 
> Elasticsearch nodes!
>       at 
> org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:80)
>       at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> In the logs, Following stack trace is shown.
> {code}
> 2018-01-01 12:15:14,432 INFO  
> org.elasticsearch.client.transport.TransportClientNodesService  - failed to 
> get node info for 
> {#transport#-1}{8IZTMPcSRCyKRynhfyN2fA}{192.168.99.100}{192.168.99.100:9300}, 
> disconnecting...
> NodeDisconnectedException[[][192.168.99.100:9300][cluster:monitor/nodes/liveness]
>  disconnected]
> 2018-01-01 12:15:19,433 ERROR org.elasticsearch.transport.netty3.Netty3Utils  
>               - fatal error on the network layer
>       at 
> org.elasticsearch.transport.netty3.Netty3Utils.maybeDie(Netty3Utils.java:195)
>       at 
> org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:82)
>       at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
>       at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377)
>       at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>       at 
> org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:291)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:292)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:391)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:315)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
>       at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
>       at 
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>       at 
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> 2018-01-01 12:15:19,448 WARN  
> org.elasticsearch.transport.netty3.Netty3Transport            - exception 
> caught on transport layer [[id: 0xef889995, /172.17.0.4:48450 => 
> /192.168.99.100:9300]], closing connection
> ElasticsearchException[java.lang.NoClassDefFoundError: 
> org/jboss/netty/channel/socket/nio/SocketSendBufferPool$GatheringSendBuffer]; 
> nested: 
> NoClassDefFoundError[org/jboss/netty/channel/socket/nio/SocketSendBufferPool$GatheringSendBuffer];
>  nested: 
> ClassNotFoundException[org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer];
>       at 
> org.elasticsearch.transport.netty3.Netty3Transport.exceptionCaught(Netty3Transport.java:325)
>       at 
> org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:83)
>       at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
>       at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377)
>       at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>       at 
> org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:291)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:292)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:391)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:315)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
>       at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
>       at 
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>       at 
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoClassDefFoundError: 
> org/jboss/netty/channel/socket/nio/SocketSendBufferPool$GatheringSendBuffer
>       at 
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:70)
>       at 
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:193)
>       ... 11 more
> Caused by: java.lang.ClassNotFoundException: 
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       ... 14 more
> {code}
> It looks like a dependency issues with netty.
> Relevant sections in POM
> {code}
> ...
>       <properties>
>               
> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>               <flink.version>1.4.0</flink.version>
>               <slf4j.version>1.7.7</slf4j.version>
>               <log4j.version>1.2.17</log4j.version>
>               <scala.binary.version>2.11</scala.binary.version>
>       </properties>
> ...
> <dependencies>
> ...
>               <dependency>
>                       <groupId>org.apache.flink</groupId>
>                       
> <artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
>                       <version>1.4.0</version>
>               </dependency>
> ...
> </dependencies>
> <profiles>
>       <profile>
>               <!-- Profile for packaging correct JAR files -->
>               <id>build-jar</id>
> ...
>                 <dependencies>
>                               <dependency>
>                                       <groupId>org.apache.flink</groupId>
>                                       
> <artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
>                                       <version>1.4.0</version>
>                               </dependency>
>               </dependencies>
> <build>
>                               <plugins>
>                                       <!-- disable the exclusion rules -->
>                                       <plugin>
>                                               
> <groupId>org.apache.maven.plugins</groupId>
>                                               
> <artifactId>maven-shade-plugin</artifactId>
>                                               <version>2.4.1</version>
>                                               <executions>
>                                                       <execution>
>                                                               
> <phase>package</phase>
>                                                               <goals>
>                                                                       
> <goal>shade</goal>
>                                                               </goals>
>                                                               <configuration>
>                                                                       
> <transformers>
>                                                                               
> <transformer 
> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
>                                                                       
> </transformers>
>                                                                       
> <artifactSet>
>                                                                               
> <excludes combine.self="override"></excludes>
>                                                                       
> </artifactSet>
>                                                               </configuration>
>                                                       </execution>
>                                               </executions>
>                                       </plugin>
>                               </plugins>
>                       </build>
> ...
> {code}
> Elasticsearch is working as I am able to create indices and connected through 
> kibana(in a separate docker).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to