For context:
I just changed the beam-runners-flink_2.10 pom file to add the shade plugin
to
pack the flink runner dependencies in an uber jar, and exclude the flink
classes
(that I assume are present on flink. My goal is to create a fat jar and add
it to
$FLINK_HOME/lib so I can deploy any pipeline on flink (with Beam included):
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.apache.flink:flink-clients_2.10</exclude>
<exclude>org.apache.flink:flink-optimizer_2.10</exclude>
<exclude>org.apache.flink:flink-java</exclude>
<exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
<exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
<exclude>org.apache.flink:flink-core</exclude>
<exclude>org.apache.flink:flink-annotations</exclude>
<exclude>org.apache.flink:flink-runtime_2.10</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
Then I build the beam-flink-runner uber jar:
mvn clean package -DskipTests
And I copy the produced jar into the $FLINK_HOME/lib directory, however
when I try to submit a
simple test jar, via the Flink Web UI I get this exception:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
at
org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
at
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
at
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.handleRequest(JarPlanHandler.java:42)
at
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
at
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
at
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:104)
at
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: interface
org.apache.beam.samples.EventsByLocation$Options is not visible from class
loader
at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:581)
at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:557)
at java.lang.reflect.WeakCache$Factory.get(WeakCache.java:230)
at java.lang.reflect.WeakCache.get(WeakCache.java:127)
at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:419)
at java.lang.reflect.Proxy.getProxyClass(Proxy.java:371)
at
org.apache.beam.sdk.options.PipelineOptionsFactory.validateWellFormed(PipelineOptionsFactory.java:620)
at
org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1374)
at
org.apache.beam.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:107)
at
org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:292)
at
org.apache.beam.samples.EventsByLocation.main(EventsByLocation.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
... 35 more
Sorry for the long exception in the message but I wanted to show all
possible details so you can help me fix this. Notice that the interface
org.apache.beam.samples.EventsByLocation$Options is a classical Beam
Options interface created to configure the Pipeline, and I tried making it
public but access does not seem to be the case, but classloading.
Thanks,
Ismael
ps.
I saw this and I think maybe it could be related:
https://stackoverflow.com/questions/211176/interface-is-not-visible-from-classloader-when-using-a-proxy
On Thu, Jul 7, 2016 at 11:30 PM, Ismaël Mejía <[email protected]> wrote:
> Both Max and Alojscha, thanks for writing, I have been a bit busy these
> last
> days with other Beam stuff and I have not been able to continue testing my
> Beam
> on flink integration, I will write back to you as soon as I have time the
> check
> this again, thanks.
>
>