When running on a Flink cluster, it's working with the absolute path. This
is the working command:
~/flink-1.0.3/bin$ ./flink run
/home/hadoop/incubator-beam/runners/flink/beam-test/target/beam-test-1.0-SNAPSHOT.jar
--input=/home/hadoop/kinglear.txt --output=wc-out.txt --runner=FlinkRunner
By the way, I think the Readme can be improved because I was getting some
errors when trying to build the jar for running on the Flink cluster. So I
have created a new maven project and replaced the src folder with the one
from examples. However I was getting:
[ERROR]
/home/hadoop/incubator-beam/runners/flink/cluster/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java:[78,38]
diamond operator is not supported in -source 1.5
(use -source 7 or higher to enable diamond operator)
so I added
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
to the plugins section of pom.xml.
Then I was getting:
[ERROR]
/home/hadoop/incubator-beam/runners/flink/cluster/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java:[38,51]
package org.apache.flink.streaming.connectors.kafka does not exist
so I added
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>1.0.3</version>
</dependency>
to the dependencies section of pom.xml.
Best regards,
Dumi
On Wed, Jul 27, 2016 at 7:48 PM, Maximilian Michels <[email protected]> wrote:
> Okay great!
>
> I think it was a permission problem before. It shouldn't matter where
> the file is as long as Flink can access it. Still puzzled about the
> error message, was that really the entire stack trace?
>
> On Wed, Jul 27, 2016 at 7:05 AM, Dumi Loghin <[email protected]> wrote:
> > Hi,
> >
> > The application is running if the input file is in the examples folder:
> >
> > ~/incubator-beam/runners/flink/examples$
> > /home/hadoop/apache-maven-3.3.9/bin/mvn exec:java
> > -Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount
> > -Dinput=kinglear.txt -Doutput=wc-out.txt
> >
> > ( as opposed to "~/incubator-beam/runners/flink/examples$
> > /home/hadoop/apache-maven-3.3.9/bin/mvn exec:java
> > -Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount
> > -Dinput=/home/hadoop/kinglear.txt -Doutput=wc-out.txt" that is throwing
> the
> > exception)
> >
> > Best regards,
> > Dumi
> >
> >
> >
> > On Tue, Jul 26, 2016 at 11:53 PM, Maximilian Michels <[email protected]>
> wrote:
> >>
> >> Hi Dumi,
> >>
> >> The execution fails during shutdown of the local Flink cluster. Does
> >> the program execute before the exception occurs?
> >>
> >> Best,
> >> Max
> >>
> >> On Tue, Jul 26, 2016 at 5:15 AM, Dumi Loghin <[email protected]>
> wrote:
> >> > Hi,
> >> >
> >> > Thank you!
> >> >
> >> > I have manually added WordCount.java and pom.xml to the latest Beam
> code
> >> > and
> >> > try to run it. But now I'm getting the following exception:
> >> >
> >> > :~/incubator-beam/runners/flink/examples$
> >> > /home/hadoop/apache-maven-3.3.9/bin/mvn exec:java
> >> > -Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount
> >> > -Dinput=/home/hadoop/kinglear.txt -Doutput=wc-out.txt
> >> >
> >> > java.lang.reflect.InvocationTargetException
> >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> > at
> >> >
> >> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >> > at
> >> >
> >> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> > at java.lang.reflect.Method.invoke(Method.java:606)
> >> > at
> >> > org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
> >> > at java.lang.Thread.run(Thread.java:745)
> >> > Caused by: java.lang.RuntimeException: Pipeline execution failed
> >> > at
> >> > org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
> >> > at
> >> > org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48)
> >> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:183)
> >> > at
> >> >
> >> >
> org.apache.beam.runners.flink.examples.WordCount.main(WordCount.java:117)
> >> > ... 6 more
> >> > Caused by: java.util.concurrent.TimeoutException: Futures timed out
> >> > after
> >> > [10000 milliseconds]
> >> > at
> >> > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> >> > at
> >> > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
> >> > at
> >> > scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
> >> > at
> >> > scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
> >> > at
> >> >
> >> >
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> >> > at scala.concurrent.Await$.ready(package.scala:86)
> >> > at
> >> >
> >> >
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.shutdown(FlinkMiniCluster.scala:340)
> >> > at
> >> >
> >> >
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:319)
> >> > at
> >> > org.apache.flink.client.LocalExecutor.stop(LocalExecutor.java:126)
> >> > at
> >> >
> >> >
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:194)
> >> > at
> >> >
> >> >
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
> >> > at
> >> >
> >> >
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:116)
> >> > at
> >> > org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110)
> >> > ... 9 more
> >> >
> >> > Best regards,
> >> > Dumi
> >> >
> >> > On Mon, Jul 25, 2016 at 7:14 PM, Maximilian Michels <[email protected]>
> >> > wrote:
> >> >>
> >> >> Hi Dumi,
> >> >>
> >> >> Somewhere on the way we broke the exec configuration. I've fixed this
> >> >> in this PR and updated the README:
> >> >> https://github.com/apache/incubator-beam/pull/724
> >> >>
> >> >> Should be merged soon.
> >> >>
> >> >> Best,
> >> >> Max
> >> >>
> >> >> On Fri, Jul 22, 2016 at 2:49 PM, Lukasz Cwik <[email protected]>
> wrote:
> >> >> > I think runner should be org.apache.beam.runners.flink.FlinkRunner
> >> >> > and
> >> >> > not
> >> >> > org.apache.beam.runners.flink.FlinkRunner.class but that is
> probably
> >> >> > unrelated to the failure that you are seeing.
> >> >> >
> >> >> > On Fri, Jul 22, 2016 at 7:42 AM, Dumi Loghin <[email protected]
> >
> >> >> > wrote:
> >> >> >>
> >> >> >> Hi,
> >> >> >>
> >> >> >> I'm still getting an exception related to arguments. I mention
> that
> >> >> >> I'm
> >> >> >> working with commit adb472083460bc175a0685133eadb6a336f1a4a2 (22
> >> >> >> Jul).
> >> >> >>
> >> >> >> ~/incubator-beam/runners/flink/examples$ mvn exec:java
> >> >> >> -Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount
> >> >> >>
> >> >> >>
> -Dexec.args="--runner=org.apache.beam.runners.flink.FlinkRunner.class
> >> >> >> --inputFile=/home/hadoop/kinglear.txt --output=wc-out.txt"
> >> >> >> ...
> >> >> >> [ERROR] Failed to execute goal
> >> >> >> org.codehaus.mojo:exec-maven-plugin:1.4.0:java (default-cli) on
> >> >> >> project
> >> >> >> beam-runners-flink_2.10-examples: Unable to parse configuration of
> >> >> >> mojo
> >> >> >> org.codehaus.mojo:exec-maven-plugin:1.4.0:java for parameter
> >> >> >> arguments:
> >> >> >> Cannot store value into array: ArrayStoreException -> [Help 1]
> >> >> >> ...
> >> >> >>
> >> >> >> However, I was able to run it in early June. If I revert to, for
> >> >> >> example,
> >> >> >> commit c2146b9f9d6a1f39a5699725ccb51829d751b88a (6 Jun) I'm not
> >> >> >> getting
> >> >> >> the
> >> >> >> above exceptions. Maybe something changed in the options
> (arguments)
> >> >> >> handling code?
> >> >> >>
> >> >> >> Best regards,
> >> >> >> Dumi
> >> >> >>
> >> >> >> On Wed, Jul 20, 2016 at 7:35 PM, Aljoscha Krettek
> >> >> >> <[email protected]>
> >> >> >> wrote:
> >> >> >>>
> >> >> >>> I think it should work like this:
> >> >> >>>
> >> >> >>> mvn exec:java \
> >> >> >>>
> -Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount \
> >> >> >>>
> >> >> >>>
> -Dexec.args="--runner=org.apache.beam.runners.flink.FlinkRunner.class
> >> >> >>> --inputFile='"$input"' --output='"$outfile_prefix"'"'
> >> >> >>>
> >> >> >>> On Wed, 20 Jul 2016 at 10:47 Dumi Loghin <[email protected]>
> >> >> >>> wrote:
> >> >> >>>>
> >> >> >>>> Hi,
> >> >> >>>>
> >> >> >>>> I'm trying to run Wordcount example on Flink single node but I'm
> >> >> >>>> getting
> >> >> >>>> this error:
> >> >> >>>>
> >> >> >>>> ~/incubator-beam/runners/flink/examples$ mvn exec:exec
> >> >> >>>> -Dinput=/home/hadoop/kinglear.txt -Doutput=wc-out.txt
> >> >> >>>> -Drunner=FlinkRunner
> >> >> >>>> ...
> >> >> >>>> Exception in thread "main" java.lang.IllegalArgumentException:
> No
> >> >> >>>> Runner
> >> >> >>>> was specified and the DirectRunner was not found on the
> classpath.
> >> >> >>>> Specify a runner by either:
> >> >> >>>> Explicitly specifying a runner by providing the 'runner'
> >> >> >>>> property
> >> >> >>>> Adding the DirectRunner to the classpath
> >> >> >>>> Calling 'PipelineOptions.setRunner(PipelineRunner)' directly
> >> >> >>>> at
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> org.apache.beam.sdk.options.PipelineOptions$DirectRunner.create(PipelineOptions.java:282)
> >> >> >>>> at
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> org.apache.beam.sdk.options.PipelineOptions$DirectRunner.create(PipelineOptions.java:273)
> >> >> >>>> at
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:482)
> >> >> >>>> at
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:157)
> >> >> >>>> at
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:72)
> >> >> >>>> at
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:306)
> >> >> >>>> at
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> org.apache.beam.runners.flink.examples.WordCount.main(WordCount.java:106)
> >> >> >>>> ...
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> I've tried with -Drunner=FlinkRunner and without it and I get
> the
> >> >> >>>> same
> >> >> >>>> error. Moreover, the runner is also set in the source:
> >> >> >>>>
> >> >> >>>> options.setRunner(FlinkRunner.class);
> >> >> >>>>
> >> >> >>>> Any suggestions?
> >> >> >>>>
> >> >> >>>> Best regards,
> >> >> >>>> Dumi
> >> >> >>
> >> >> >>
> >> >> >
> >> >
> >> >
> >
> >
>