Hey,

you can ignore the second error, that one is unrelated.

I also just tried out passing absolute paths for different drives and I am not encountering issues. :/

Could you check the return value of the following line? Just add it to your job.

   new File("D:\\dir\\myfile.csv").exists();

Furthermore, please check what the relative path you supply is expanded to with the following line:

   FileSystem.getLocalFileSystem().getFileStatus(new Path(<relative
   path>)).getPath();

Just to cover all bases, this is just a single node, right?

Regards,
Chesnay

On 20.10.2016 14:41, Radu Tudoran wrote:

Hi,

I know that Flink in general supports files also on windows. For example I just tested successfully with relative file paths (e.g. place the file in the local directory and give just the file name then everything is working correctly). However with absolute paths it does not work as per my previous explanation. Nevertheless, please see also the error log below.

Exception in thread "main" _org.apache.flink.runtime.client.JobExecutionException_: Job execution failed.

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(_JobManager.scala:822_)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(_JobManager.scala:768_)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(_JobManager.scala:768_)

at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(_Future.scala:24_)

at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(_Future.scala:24_)

at akka.dispatch.TaskInvocation.run(_AbstractDispatcher.scala:41_)

at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(_AbstractDispatcher.scala:401_)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(_ForkJoinTask.java:260_)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(_ForkJoinPool.java:1339_)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(_ForkJoinPool.java:1979_)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(_ForkJoinWorkerThread.java:107_)

Caused by: _java.io.IOException_: No file system found with scheme D, referenced in file URI 'D:/dir/myfile.csv'.

at org.apache.flink.core.fs.FileSystem.get(_FileSystem.java:297_)

at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(_ContinuousFileMonitoringFunction.java:120_)

at org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:80_)

at org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:53_)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(_SourceStreamTask.java:56_)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(_StreamTask.java:266_)

at org.apache.flink.runtime.taskmanager.Task.run(_Task.java:584_)

at java.lang.Thread.run(_Thread.java:745_)

in addition to this there is some additional error if I dig through the output logs

4:33:32,651 ERROR org.apache.hadoop.util.Shell - Failed to locate the winutils binary in the hadoop binary path

_java.io.IOException_: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

     at org.apache.hadoop.util.Shell.getQualifiedBinPath(_Shell.java:318_)

     at org.apache.hadoop.util.Shell.getWinUtilsPath(_Shell.java:333_)

     at org.apache.hadoop.util.Shell.<clinit>(_Shell.java:326_)

     at org.apache.hadoop.util.StringUtils.<clinit>(_StringUtils.java:76_)

at org.apache.hadoop.security.Groups.parseStaticMapping(_Groups.java:92_)

     at org.apache.hadoop.security.Groups.<init>(_Groups.java:76_)

at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(_Groups.java:239_)

at org.apache.hadoop.security.UserGroupInformation.initialize(_UserGroupInformation.java:255_)

at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(_UserGroupInformation.java:232_)

at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(_UserGroupInformation.java:718_)

at org.apache.hadoop.security.UserGroupInformation.getLoginUser(_UserGroupInformation.java:703_)

at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(_UserGroupInformation.java:605_)

at org.apache.hadoop.fs.viewfs.ViewFileSystem.<init>(_ViewFileSystem.java:130_)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(_Native Method_)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(_NativeConstructorAccessorImpl.java:57_)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(_DelegatingConstructorAccessorImpl.java:45_)

     at java.lang.reflect.Constructor.newInstance(_Constructor.java:526_)

     at java.lang.Class.newInstance(_Class.java:379_)

at java.util.ServiceLoader$LazyIterator.next(_ServiceLoader.java:373_)

     at java.util.ServiceLoader$1.next(_ServiceLoader.java:445_)

at org.apache.hadoop.fs.FileSystem.loadFileSystems(_FileSystem.java:2283_)

at org.apache.hadoop.fs.FileSystem.getFileSystemClass(_FileSystem.java:2294_)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(_Native Method_)

at sun.reflect.NativeMethodAccessorImpl.invoke(_NativeMethodAccessorImpl.java:57_)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(_DelegatingMethodAccessorImpl.java:43_)

     at java.lang.reflect.Method.invoke(_Method.java:606_)

at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getDefaultHDFSClass(_HadoopFileSystem.java:91_)

at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.<init>(_HadoopFileSystem.java:75_)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(_Native Method_)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(_NativeConstructorAccessorImpl.java:57_)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(_DelegatingConstructorAccessorImpl.java:45_)

     at java.lang.reflect.Constructor.newInstance(_Constructor.java:526_)

at org.apache.flink.core.fs.FileSystem.instantiateHadoopFileSystemWrapper(_FileSystem.java:334_)

at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(_FileSystem.java:358_)

     at org.apache.flink.core.fs.FileSystem.get(_FileSystem.java:280_)

at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(_ContinuousFileMonitoringFunction.java:120_)

at org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:80_)

at org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:53_)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(_SourceStreamTask.java:56_)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(_StreamTask.java:266_)

     at org.apache.flink.runtime.taskmanager.Task.run(_Task.java:584_)

     at java.lang.Thread.run(_Thread.java:745_)

*From:*Chesnay Schepler [mailto:ches...@apache.org]
*Sent:* Thursday, October 20, 2016 2:22 PM
*To:* user@flink.apache.org
*Subject:* Re: org.apache.flink.core.fs.Path error?

Hello Radu,

Flink can handle windows paths, this alone can't be the problem. If you could post the error you are getting we may pinpoint the issue, but right now i would suggest the usual: check that the path is indeed correct, that you have sufficient permissions to access the file.

And yes, you can report problems here ;)

Regards,
Chesnay

On 20.10.2016 13:17, Radu Tudoran wrote:

    Hi,

    I am running a program that is suppose to read a CSV file from the
    local disk (I am still using Flink 1.1..i did not check if the
    situation is the same for 1.2). I am currently running the test on
    a windows OS.

    I am creating the path to the file e.g. “D:\\dir\\myfile.csv”

    However, I see that the CSV reader converts this to a Path object
    from flink core

    “val inputFormat = new TupleCsvInputFormat(new Path(path),
    rowDelim, fieldDelim, typeInfo)”  In CSVTableSource

    This ends up representing the initial path as an URI and changes \
    to / resulting in ““D:/dir/myfile.csv””. The problem is that this
    is never changed when the file is actually open and accessed which
    leads to an error.

    …not sure if signaling this error here is the best place or if I
    should have used some other media..

    Best regards,

    Dr. Radu Tudoran

    Senior Research Engineer - Big Data Expert

    IT R&D Division

    cid:image007.jpg@01CD52EB.AD060EE0

    HUAWEI TECHNOLOGIES Duesseldorf GmbH

    European Research Center

    Riesstrasse 25, 80992 München

    E-mail: _radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com>_

    Mobile: +49 15209084330

    Telephone: +49 891588344173

    HUAWEI TECHNOLOGIES Duesseldorf GmbH
    Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
    <http://www.huawei.com/>
    Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
    Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
    Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
    Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

    This e-mail and its attachments contain confidential information
    from HUAWEI, which is intended only for the person or entity whose
    address is listed above. Any use of the information contained
    herein in any way (including, but not limited to, total or partial
    disclosure, reproduction, or dissemination) by persons other than
    the intended recipient(s) is prohibited. If you receive this
    e-mail in error, please notify the sender by phone or email
    immediately and delete it!


Reply via email to