Hi,
I have run into an issue with launching Beam applications that use
KafkaIO on a flink cluster:
"java.lang.IllegalAccessError: tried to access method
com.google.common.base.Optional.<init>()V from class
com.google.common.base.Absent"
(full output, and pom.xml below)
Other Beam applications launch correctly and previous versions that used
the FlinkKafkaConsumer also worked correctly.
Running directly from Eclipse works fine.
I've reproduced the error using the code from Emanuele Cesena's
excellent Beam/Flink demo: https://github.com/ecesena/beam-starter.
His WordCount example runs correctly but StreamWordCount fails.
This occurs with any combination of Beam 0.1.0, 0.2.0, and
0.3.0-incubating with Flink 1.0.3 and 1.1.3
At first glance it looks like something needs to be shaded but so far no
joy on that front.
Can anyone point out what I've missed?
Thanks,
Wayne
Here's the output from attempting to run the Beam app:
---------------------------------------------------
# flink run -c com.dataradiant.beam.examples.StreamWordCount
./target/beam-starter-0.2.jar
Found YARN properties file /tmp/.yarn-properties-root
Using JobManager address from YARN properties
hadoop1.dades.ca/192.168.124.101:39432
11/24/2016 13:56:36 Job execution switched to status RUNNING.
11/24/2016 13:56:36 Source: Read(UnboundedKafkaSource) ->
AnonymousParDo -> AnonymousParDo -> Flat Map -> ParDo(ExtractWords) ->
AnonymousParDo(1/1) switched to SCHEDULED
11/24/2016 13:56:36 Source: Read(UnboundedKafkaSource) ->
AnonymousParDo -> AnonymousParDo -> Flat Map -> ParDo(ExtractWords) ->
AnonymousParDo(1/1) switched to DEPLOYING
11/24/2016 13:56:36 GroupByWindowWithCombiner -> AnonymousParDo(1/1)
switched to SCHEDULED
11/24/2016 13:56:36 GroupByWindowWithCombiner -> AnonymousParDo(1/1)
switched to DEPLOYING
11/24/2016 13:56:36 GroupByWindowWithCombiner -> AnonymousParDo(1/1)
switched to RUNNING
11/24/2016 13:56:36 Source: Read(UnboundedKafkaSource) ->
AnonymousParDo -> AnonymousParDo -> Flat Map -> ParDo(ExtractWords) ->
AnonymousParDo(1/1) switched to RUNNING
11/24/2016 13:56:37 Source: Read(UnboundedKafkaSource) ->
AnonymousParDo -> AnonymousParDo -> Flat Map -> ParDo(ExtractWords) ->
AnonymousParDo(1/1) switched to FAILED
java.lang.IllegalAccessError: tried to access method
com.google.common.base.Optional.<init>()V from class
com.google.common.base.Absent
at com.google.common.base.Absent.<init>(Absent.java:35)
at com.google.common.base.Absent.<clinit>(Absent.java:33)
at sun.misc.Unsafe.ensureClassInitialized(Native Method)
at
sun.reflect.UnsafeFieldAccessorFactory.newFieldAccessor(UnsafeFieldAccessorFactory.java:43)
at
sun.reflect.ReflectionFactory.newFieldAccessor(ReflectionFactory.java:140)
at java.lang.reflect.Field.acquireFieldAccessor(Field.java:1057)
at java.lang.reflect.Field.getFieldAccessor(Field.java:1038)
at java.lang.reflect.Field.getLong(Field.java:591)
at
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1663)
at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at java.util.ArrayList.readObject(ArrayList.java:771)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:194)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:186)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
11/24/2016 13:56:37 Job execution switched to status FAILING.
(repeated error lines deleted)
-------------------------------------
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dataradiant.beam</groupId>
<artifactId>beam-starter</artifactId>
<version>0.2</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.0.3</flink.version>
<beam.version>0.2.0-incubating</beam.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10-examples</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${beam.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.dataradiant.beam.examples.StreamWordCount</mainClass>
</transformer>
</transformers>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.apache.flink:flink-clients_2.10</exclude>
<exclude>org.apache.flink:flink-optimizer_2.10</exclude>
<exclude>org.apache.flink:flink-java</exclude>
<exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
<exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
<exclude>org.apache.flink:flink-core</exclude>
<exclude>org.apache.flink:flink-annotations</exclude>
<exclude>org.apache.flink:flink-runtime_2.10</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
-------------------------------------