[jira] [Comment Edited] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException

2018-01-30 Thread JIRA

[ 
https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345099#comment-16345099
 ] 

Grzegorz KoĊ‚akowski edited comment on BEAM-2943 at 1/30/18 2:26 PM:


[~guenhter]

I tried to reproduce your problem and eventually I managed to observe the same 
exception.

The key question is how you built the project? You didn't mention it in the 
description but it turned out to be a crucial step.

The class 
{{org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}} is a 
part of {{org.apache.beam:beam-runners-flink_2.10}} dependency, which is 
included into uber-jar only if the {{flink-runner}} profile is used. I guess 
you just simply run {{mvn package}} instead of {{mvn package -Pflink-runner}}.


was (Author: grzegorz_kolakowski):
[~guenhter]

I tried to reproduce your problem and eventually I managed to observe the same 
exception.

The key question is how you built the project? You didn't mentioned it in the 
desciption but it turned out to be a crucial step.

The class 
{{org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}} is a 
part of {{org.apache.beam:beam-runners-flink_2.10}} dependency, which is 
included into uber-jar only if the {{flink-runner}} profile is used. I guess 
you just simply run {{mvn package}} instead of {{mvn package -Pflink-runner}}.

> Beam Flink deployment results in ClassNotFoundException
> ---
>
> Key: BEAM-2943
> URL: https://issues.apache.org/jira/browse/BEAM-2943
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.1.0
> Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 
> (2017-08-06) x86_64 GNU/Linux
>Reporter: Guenther Grill
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: flink
>
> Hi,
> I followed the guide https://beam.apache.org/get-started/quickstart-java/ to 
> run beam program within a flink cluster. 
> The output of the dependency-command is:
> {code}
> mvn dependency:tree -Pflink-runner |grep flink
>   
> [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
> [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
> [INFO]|  \- org.apache.flink:force-shading:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
> {code}
> Then I started the flink cluster with the correct version with docker-compose
> {code}
> export JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10
> docker-compose up -d
> {code}
> The compose file looks like this:
> {code}
> version: '3.3'
> services:
>   jobmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6123"
> ports:
>   - "6123:6123"
>   - "8081:8081"
> volumes:
>   - /tmp:/tmp
> command: jobmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
>   taskmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6121"
>   - "6122"
> depends_on:
>   - jobmanager
> command: taskmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> {code}
> The flink cluster works, but when I execute 
> {code}
> mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
> -Pflink-runner \
> -Dexec.args="--runner=FlinkRunner \
>   --inputFile=pom.xml \
>   --output=/path/to/counts \
>   --flinkMaster=[HOST_IP]:6123 \
>   --filesToStage=target/word-count-beam-bundled-0.1.jar"
> {code}
> I get:
> {code}
> 2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Submitting job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199).
> 2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Using restart strategy NoRestartStrategy for 
> a913f922506053e65e732eeb8336b3bd.
> 2017-09-12 06:39:57,227 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
> via failover strategy: full graph restart
> 2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Running initialization on master for job 

[jira] [Comment Edited] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException

2017-09-14 Thread Guenther Grill (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166404#comment-16166404
 ] 

Guenther Grill edited comment on BEAM-2943 at 9/14/17 2:57 PM:
---

I double checked it, by looking into the 
{code}target/word-count-beam-bundled-0.1.jar{code} file. The class is there.


was (Author: guenhter):
I double checked it, by looking into the 
`target/word-count-beam-bundled-0.1.jar` file. The class is there.

> Beam Flink deployment results in ClassNotFoundException
> ---
>
> Key: BEAM-2943
> URL: https://issues.apache.org/jira/browse/BEAM-2943
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.1.0
> Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 
> (2017-08-06) x86_64 GNU/Linux
>Reporter: Guenther Grill
>Assignee: Aljoscha Krettek
>  Labels: flink
>
> Hi,
> I followed the guide https://beam.apache.org/get-started/quickstart-java/ to 
> run beam program within a flink cluster. 
> The output of the dependency-command is:
> {code}
> mvn dependency:tree -Pflink-runner |grep flink
>   
> [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
> [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
> [INFO]|  \- org.apache.flink:force-shading:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
> {code}
> Then I started the flink cluster with the correct version with docker-compose
> {code}
> export JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10
> docker-compose up -d
> {code}
> The compose file looks like this:
> {code}
> version: '3.3'
> services:
>   jobmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6123"
> ports:
>   - "6123:6123"
>   - "8081:8081"
> volumes:
>   - /tmp:/tmp
> command: jobmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
>   taskmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6121"
>   - "6122"
> depends_on:
>   - jobmanager
> command: taskmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> {code}
> The flink cluster works, but when I execute 
> {code}
> mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
> -Pflink-runner \
> -Dexec.args="--runner=FlinkRunner \
>   --inputFile=pom.xml \
>   --output=/path/to/counts \
>   --flinkMaster=[HOST_IP]:6123 \
>   --filesToStage=target/word-count-beam-bundled-0.1.jar"
> {code}
> I get:
> {code}
> 2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Submitting job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199).
> 2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Using restart strategy NoRestartStrategy for 
> a913f922506053e65e732eeb8336b3bd.
> 2017-09-12 06:39:57,227 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
> via failover strategy: full graph restart
> 2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Running initialization on master for job 
> wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
> 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager  
>   - Failed to submit job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199)
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at Read(CompressedSource) 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': 
> Deserializing the InputFormat 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
>  failed: Could not read the user code wrapper: 
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
>   at 
>