I actually just got it to work by updating the Akka dependency of Flink to
2.4.9 (Akka 2.4.x does not have a Protobuf dependency anymore and Protobuf
only came into Flink as a transitive dependency.) I did this on Flink 1.1.2
so I also had to update the Flink Runner to 1.1.2 (I also changed the
Runner pom to create a shaded "bundled" jar). Here are the two branches
that you can use to get it to work:

 - https://github.com/aljoscha/flink/tree/flink-1.1.2-akka-2.4.9
 - https://github.com/aljoscha/incubator-beam/tree/flink-1.1.2

The steps I did:
 - checkout Flink
 - run "mvn clean install -DskipTests"
 - checkout Beam
 - run "mvn clean install -DskipTests"
 - build your testing project using "mvn clean package"
 - copy beam-runners-flink_2.10-bundled-0.3.0-incubating-SNAPSHOT.jar from
<beam-root>/runners/flink/runner/target to the lib/ folder of the Flink
install
 - copy the jar from your testing project to the lib/ folder as well (this
is important)
 - only now start the cluster
 - run using "bin/flink run" while also specifying your jar

One caveat is that you have to move the program jar to the lib folder as
well because of some class loader issues. It doesn't work if you simply
give it as an argument to "bin/flink run". Also, the Web Dashboard seems to
not work with those two jars in the lib folder. Probably because there is
some stuff in those jars that shouldn't really be there.

In the future we should probably provide ready-made packages for this and
update both Flink and Beam.

Cheers,
Aljoscha

P.S. While writing this I just saw your second mail. Good that you also
found a solution! :-)

On Mon, 5 Sep 2016 at 17:42 Pawel Szczur <[email protected]> wrote:

> Thanks.
>
> As far as I found Protobuf is used in two places independently in Beam:
>  - ProtoCoder, dependency in: (
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/pom.xml
> )
>  - in FlinkRunner code, it's transient comes from org.apache.flink
>
> Now I'm thinking, would it be possible to shade the com.google.protobuf in
> all Flink cluster and flink-runner.jar and leaving the Beam (i could use
> version 3.0.0-beta1 for my proto)?
> (I've tried, and I think it doesn't really work)
>
> I will try your approach with shading the proto in Beam and my program.
> I'm using jarjar to replace package name in jar.
>
> 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <[email protected]>:
>
>> This seems to be a bigger problem. I got it to compile and (almost) run
>> by shading away the protobuf dependency of Beam in the Flink Runner jar:
>> https://github.com/apache/incubator-beam/blob/c83783209f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223.
>> This does not really work, though, since now your code will not use a
>> ProtoCoder but a SerializableCoder for your protobuf-generated class. The
>> problem is that Beam uses reflection to determine whether a ProtoCoder can
>> be used on user classes. Now, your user class will be derived from
>> com.google.protobuf.MessageOrBuilder but the Beam code will look for
>> something like flink.relocated.com.google.protobuf.MessageOrBuilder so
>> that doesn't work.
>>
>> The only solution I see for this is to compile both Beam and the user
>> program, create a fat-jar from this and then create another fat jar where
>> Protobuf is relocated in all the code, i.e. in both the  Beam code and the
>> user code. That's not a very nice solution, though.
>>
>> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <[email protected]> wrote:
>>
>>> I've tried version 2.5.0, no difference. I've found that problem is in a
>>> Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
>>> 2.5.0. I'm not sure if it's possible to enforce other proto version in
>>> Flink or rename it by adding some prefix like 'org.apache.flink'
>>> +'com.google.protobuf' when used in Flink.
>>>
>>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>
>>>> I imagine that it's caused by the classpath being different when you
>>>> run it using the Flink command. It might also be that your program fails at
>>>> a different point once you fix the first problem, due to the protobuf
>>>> mismatches.
>>>>
>>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]> wrote:
>>>>
>>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
>>>>> that when I run it as a binary it works fine, but sent to local cluster it
>>>>> fails?
>>>>>
>>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>
>>>>>> Yep, as I said the problem is most likely that Flink has a dependency
>>>>>> on a different version of protobuf so that clashes with the version that
>>>>>> Beam provides or that you have as a dependency. Have you tried setting
>>>>>> 2.5.0 as the version, since that's what Flink uses. In the end that's 
>>>>>> not a
>>>>>> proper solution, however, and both Flink and Beam should likely shade 
>>>>>> their
>>>>>> protobuf dependency, not sure of that's possible, though.
>>>>>>
>>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for reply.
>>>>>>>
>>>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0
>>>>>>> just for curiosity. It still didn't work.
>>>>>>>
>>>>>>> I've added explicitly to pom.xml:
>>>>>>>
>>>>>>> <dependency>
>>>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>>>     <version>3.0.0-beta1</version>
>>>>>>> </dependency>
>>>>>>>
>>>>>>> Did I miss some param?
>>>>>>>
>>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I think this is the classic problem of dependency version
>>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink 
>>>>>>>> has a
>>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running your
>>>>>>>> program through the bin/flink command the order in the classpath might 
>>>>>>>> be
>>>>>>>> different and you're getting the wrong version.
>>>>>>>>
>>>>>>>> As an immediate fix, I think you could try having your own
>>>>>>>> dependency on protobuf and shade that, so that you don't have version
>>>>>>>> conflicts.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm trying to run a simple pipeline on local cluster using
>>>>>>>>> Protocol Buffer to pass data between Beam functions.
>>>>>>>>>
>>>>>>>>> Everything works fine if I run it through:
>>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>
>>>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>
>>>>>>>>> The ready to run project:
>>>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>>>
>>>>>>>>> Any clues?
>>>>>>>>>
>>>>>>>>> Cheers, Pawel
>>>>>>>>>
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>  The program finished with the following exception:
>>>>>>>>>
>>>>>>>>> java.lang.NoSuchMethodError:
>>>>>>>>> com.google.protobuf.ExtensionRegistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)Ljava/util/Set;
>>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>> at
>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>> at
>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>

Reply via email to