[jira] [Comment Edited] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException
[ https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345099#comment-16345099 ] Grzegorz KoĊakowski edited comment on BEAM-2943 at 1/30/18 2:26 PM: [~guenhter] I tried to reproduce your problem and eventually I managed to observe the same exception. The key question is how you built the project? You didn't mention it in the description but it turned out to be a crucial step. The class {{org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}} is a part of {{org.apache.beam:beam-runners-flink_2.10}} dependency, which is included into uber-jar only if the {{flink-runner}} profile is used. I guess you just simply run {{mvn package}} instead of {{mvn package -Pflink-runner}}. was (Author: grzegorz_kolakowski): [~guenhter] I tried to reproduce your problem and eventually I managed to observe the same exception. The key question is how you built the project? You didn't mentioned it in the desciption but it turned out to be a crucial step. The class {{org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}} is a part of {{org.apache.beam:beam-runners-flink_2.10}} dependency, which is included into uber-jar only if the {{flink-runner}} profile is used. I guess you just simply run {{mvn package}} instead of {{mvn package -Pflink-runner}}. > Beam Flink deployment results in ClassNotFoundException > --- > > Key: BEAM-2943 > URL: https://issues.apache.org/jira/browse/BEAM-2943 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.1.0 > Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 > (2017-08-06) x86_64 GNU/Linux >Reporter: Guenther Grill >Assignee: Aljoscha Krettek >Priority: Major > Labels: flink > > Hi, > I followed the guide https://beam.apache.org/get-started/quickstart-java/ to > run beam program within a flink cluster. > The output of the dependency-command is: > {code} > mvn dependency:tree -Pflink-runner |grep flink > > [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime > [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime > [INFO]| \- org.apache.flink:force-shading:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-annotations:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime > {code} > Then I started the flink cluster with the correct version with docker-compose > {code} > export JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10 > docker-compose up -d > {code} > The compose file looks like this: > {code} > version: '3.3' > services: > jobmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6123" > ports: > - "6123:6123" > - "8081:8081" > volumes: > - /tmp:/tmp > command: jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > taskmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6121" > - "6122" > depends_on: > - jobmanager > command: taskmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > {code} > The flink cluster works, but when I execute > {code} > mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ > -Pflink-runner \ > -Dexec.args="--runner=FlinkRunner \ > --inputFile=pom.xml \ > --output=/path/to/counts \ > --flinkMaster=[HOST_IP]:6123 \ > --filesToStage=target/word-count-beam-bundled-0.1.jar" > {code} > I get: > {code} > 2017-09-12 06:39:57,226 INFO org.apache.flink.runtime.jobmanager.JobManager > - Submitting job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199). > 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.jobmanager.JobManager > - Using restart strategy NoRestartStrategy for > a913f922506053e65e732eeb8336b3bd. > 2017-09-12 06:39:57,227 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers > via failover strategy: full graph restart > 2017-09-12 06:39:57,229 INFO org.apache.flink.runtime.jobmanager.JobManager > - Running initialization on master for job
[jira] [Comment Edited] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException
[ https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166404#comment-16166404 ] Guenther Grill edited comment on BEAM-2943 at 9/14/17 2:57 PM: --- I double checked it, by looking into the {code}target/word-count-beam-bundled-0.1.jar{code} file. The class is there. was (Author: guenhter): I double checked it, by looking into the `target/word-count-beam-bundled-0.1.jar` file. The class is there. > Beam Flink deployment results in ClassNotFoundException > --- > > Key: BEAM-2943 > URL: https://issues.apache.org/jira/browse/BEAM-2943 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.1.0 > Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 > (2017-08-06) x86_64 GNU/Linux >Reporter: Guenther Grill >Assignee: Aljoscha Krettek > Labels: flink > > Hi, > I followed the guide https://beam.apache.org/get-started/quickstart-java/ to > run beam program within a flink cluster. > The output of the dependency-command is: > {code} > mvn dependency:tree -Pflink-runner |grep flink > > [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime > [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime > [INFO]| \- org.apache.flink:force-shading:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-annotations:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime > {code} > Then I started the flink cluster with the correct version with docker-compose > {code} > export JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10 > docker-compose up -d > {code} > The compose file looks like this: > {code} > version: '3.3' > services: > jobmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6123" > ports: > - "6123:6123" > - "8081:8081" > volumes: > - /tmp:/tmp > command: jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > taskmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6121" > - "6122" > depends_on: > - jobmanager > command: taskmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > {code} > The flink cluster works, but when I execute > {code} > mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ > -Pflink-runner \ > -Dexec.args="--runner=FlinkRunner \ > --inputFile=pom.xml \ > --output=/path/to/counts \ > --flinkMaster=[HOST_IP]:6123 \ > --filesToStage=target/word-count-beam-bundled-0.1.jar" > {code} > I get: > {code} > 2017-09-12 06:39:57,226 INFO org.apache.flink.runtime.jobmanager.JobManager > - Submitting job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199). > 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.jobmanager.JobManager > - Using restart strategy NoRestartStrategy for > a913f922506053e65e732eeb8336b3bd. > 2017-09-12 06:39:57,227 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers > via failover strategy: full graph restart > 2017-09-12 06:39:57,229 INFO org.apache.flink.runtime.jobmanager.JobManager > - Running initialization on master for job > wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd). > 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager > - Failed to submit job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199) > org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task > 'DataSource (at Read(CompressedSource) > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': > Deserializing the InputFormat > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) > failed: Could not read the user code wrapper: > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153) > at >