Unfortunately, it does not work with XtreemFS. I set "fs.hdfs.hadoopconf" to the Hadoop configuration directory and tried to run the word count example:

bin/flink run -v examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar xtreemfs:///test.txt xtreemfs:///result.txt

The following error occurred:

Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:449) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:350)
    at org.apache.flink.client.program.Client.run(Client.java:242)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:349)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:336)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:976)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1000)
Caused by: java.io.IOException: The given file URI (xtreemfs:///result.txt) points to the HDFS NameNode at null, but the File System could not be initialized with that address: port out of range:-1 at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:325)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:244)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:299)
at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:267) at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.yarn.YarnJobManager$$anonfun$receiveYarnMessages$1.applyOrElse(YarnJobManager.scala:68)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: port out of range:-1
    at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
    at java.net.InetSocketAddress.<init>(InetSocketAddress.java:224)
at org.xtreemfs.common.libxtreemfs.Helper.stringToInetSocketAddress(Helper.java:82) at org.xtreemfs.common.libxtreemfs.RPCCaller.getInetSocketAddressFromAddress(RPCCaller.java:301) at org.xtreemfs.common.libxtreemfs.RPCCaller.syncCall(RPCCaller.java:88) at org.xtreemfs.common.libxtreemfs.RPCCaller.syncCall(RPCCaller.java:49) at org.xtreemfs.common.libxtreemfs.ClientImplementation.listVolumeNames(ClientImplementation.java:529) at org.xtreemfs.common.clients.hadoop.XtreemFSFileSystem.initialize(XtreemFSFileSystem.java:164) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:311)
    ... 31 more

Am 07.01.2015 um 14:04 schrieb Stephan Ewen:

Hi!

You can reference a Hadoop configuration with a defaultFS entry via "fs.hdfs.hadoopconf".

Have a look at the configuration reference for details: http://flink.incubator.apache.org/docs/0.7-incubating/config.html

Let us know if it works for XtreemFS...

Greetings,
Stephan

Am 07.01.2015 13:51 schrieb "Lukas Kairies" <[email protected] <mailto:[email protected]>>:

    Thanks, now it works :) It is possible so set a default filesystem
    in flink like in Hadoop (with fs.default.name
    <http://fs.default.name>)? Currently I always have to set the
    complete file URI like xtreemfs://<host>:<port>/file

    Best,
    Lukas
    Am 07.01.2015 um 12:22 schrieb Robert Metzger:
    Hi Lukas,

    I see that there is a XtreemFS Hadoop client
    (http://www.xtreemfs.org/download.php?t=source). WIth this
    pending pull request https://github.com/apache/flink/pull/268 you
    can use all file systems supported by hadoop with Flink (we
    support the org.apache.hadoop.FileSystems interface).

    The pull request has not been merged yet because of a failing
    test, but that should not affect you.
    If you want, you can check out the branch of my pull request

    git clone https://github.com/rmetzger/flink.git
    cd flink
    git checkout flink1266
    mvn clean install -DskipTests

    In the "flink-dist/target/flink-XXX/flink-yarn-XXX/" directory is
    the finished built.

    Let me know if you need more help or information.

    Best,
    Robert


    On Wed, Jan 7, 2015 at 12:00 PM, Lukas Kairies
    <[email protected]
    <mailto:[email protected]>> wrote:

        Hello,

        I like to test flink on YARN with the alternative file system
        XtreemFS. Therefore I have to add a jar file to flink but I
        found no possibility to do so. How can I do this? Hadoop
        works fine with XtreemFS.

        Thanks

        Lukas




Reply via email to