I've opened a JIRA for this issue: https://issues.apache.org/jira/browse/FLINK-4870

On 20.10.2016 16:25, Radu Tudoran wrote:

Hi,

As you found the source of the error I am not sure if the outputs that you asked for are needed anymore. Nevertheless, see below

new File("D:\\dir\\myfile.csv").exists(); => true

FileSystem.getLocalFileSystem().getFileStatus(new Path(<relative path>)).getPath(); => “file:/D:/dir/myfile.csv”

Fabian’s suggestion of specifying the file like “"file:/D:/dir/myfile.csv"?” => works!

Nevertheless, IMHO I would suggest to fix the issue as it is in general more practical to specify paths in the form of D:\\dir\\myfile.csv … mainly as it can be understood also by other file readers outside flink

*From:*Chesnay Schepler [mailto:ches...@apache.org]
*Sent:* Thursday, October 20, 2016 4:06 PM
*To:* user@flink.apache.org
*Subject:* Re: org.apache.flink.core.fs.Path error?

I believe i found the issue. The ContinuousFileMonitoringFunction never converts the given string to a Path, but directly generates a URI from it.

On 20.10.2016 15:48, Fabian Hueske wrote:

    The error message suggests that Flink tries to resolve "D:" as a
    file system schema such as "file:" or "hdfs:".


    Can you try to use specify your path as "file:/D:/dir/myfile.csv"
    <file:///D:%5Cdir%5Cmyfile.csv>?

    Best, Fabian

    2016-10-20 14:41 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com
    <mailto:radu.tudo...@huawei.com>>:

    Hi,

    I know that Flink in general supports files also on windows. For
    example I just tested successfully with relative file paths (e.g.
    place the file in the local directory and give just the file name
    then everything is working correctly). However with absolute paths
    it does not work as per my previous explanation. Nevertheless,
    please see also the error log below.

    Exception in thread "main"
    _org.apache.flink.runtime.client.JobExecutionException_: Job
    execution failed.

         at
    
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(_JobManager.scala:822_)

         at
    
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(_JobManager.scala:768_)

         at
    
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(_JobManager.scala:768_)

         at
    
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(_Future.scala:24_)

         at
    
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(_Future.scala:24_)

         at
    akka.dispatch.TaskInvocation.run(_AbstractDispatcher.scala:41_)

         at
    
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(_AbstractDispatcher.scala:401_)

         at
    scala.concurrent.forkjoin.ForkJoinTask.doExec(_ForkJoinTask.java:260_)

         at
    
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(_ForkJoinPool.java:1339_)

         at
    scala.concurrent.forkjoin.ForkJoinPool.runWorker(_ForkJoinPool.java:1979_)

         at
    
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(_ForkJoinWorkerThread.java:107_)

    Caused by: _java.io.IOException_: No file system found with scheme
    D, referenced in file URI 'D:/dir/myfile.csv'.

         at org.apache.flink.core.fs.FileSystem.get(_FileSystem.java:297_)

         at
    
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(_ContinuousFileMonitoringFunction.java:120_)

         at
    
org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:80_)

         at
    
org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:53_)

         at
    
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(_SourceStreamTask.java:56_)

         at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(_StreamTask.java:266_)

         at org.apache.flink.runtime.taskmanager.Task.run(_Task.java:584_)

         at java.lang.Thread.run(_Thread.java:745_)

    in addition to this there is some additional error if I dig
    through the output logs

    4:33:32,651 ERROR org.apache.hadoop.util.Shell - Failed to locate
    the winutils binary in the hadoop binary path

    _java.io.IOException_: Could not locate executable
    null\bin\winutils.exe in the Hadoop binaries.

    at org.apache.hadoop.util.Shell.getQualifiedBinPath(_Shell.java:318_)

    at org.apache.hadoop.util.Shell.getWinUtilsPath(_Shell.java:333_)

    at org.apache.hadoop.util.Shell.<clinit>(_Shell.java:326_)

    at org.apache.hadoop.util.StringUtils.<clinit>(_StringUtils.java:76_)

    at
    org.apache.hadoop.security.Groups.parseStaticMapping(_Groups.java:92_)

    at org.apache.hadoop.security.Groups.<init>(_Groups.java:76_)

    at
    
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(_Groups.java:239_)

    at
    
org.apache.hadoop.security.UserGroupInformation.initialize(_UserGroupInformation.java:255_)

    at
    
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(_UserGroupInformation.java:232_)

    at
    
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(_UserGroupInformation.java:718_)

    at
    
org.apache.hadoop.security.UserGroupInformation.getLoginUser(_UserGroupInformation.java:703_)

    at
    
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(_UserGroupInformation.java:605_)

    at
    org.apache.hadoop.fs.viewfs.ViewFileSystem.<init>(_ViewFileSystem.java:130_)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(_Native
    Method_)

    at
    
sun.reflect.NativeConstructorAccessorImpl.newInstance(_NativeConstructorAccessorImpl.java:57_)

    at
    
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(_DelegatingConstructorAccessorImpl.java:45_)

    at java.lang.reflect.Constructor.newInstance(_Constructor.java:526_)

    at java.lang.Class.newInstance(_Class.java:379_)

    at java.util.ServiceLoader$LazyIterator.next(_ServiceLoader.java:373_)

    at java.util.ServiceLoader$1.next(_ServiceLoader.java:445_)

    at
    org.apache.hadoop.fs.FileSystem.loadFileSystems(_FileSystem.java:2283_)

    at
    org.apache.hadoop.fs.FileSystem.getFileSystemClass(_FileSystem.java:2294_)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(_Native Method_)

    at
    
sun.reflect.NativeMethodAccessorImpl.invoke(_NativeMethodAccessorImpl.java:57_)

    at
    
sun.reflect.DelegatingMethodAccessorImpl.invoke(_DelegatingMethodAccessorImpl.java:43_)

    at java.lang.reflect.Method.invoke(_Method.java:606_)

    at
    
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getDefaultHDFSClass(_HadoopFileSystem.java:91_)

    at
    
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.<init>(_HadoopFileSystem.java:75_)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(_Native
    Method_)

    at
    
sun.reflect.NativeConstructorAccessorImpl.newInstance(_NativeConstructorAccessorImpl.java:57_)

    at
    
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(_DelegatingConstructorAccessorImpl.java:45_)

    at java.lang.reflect.Constructor.newInstance(_Constructor.java:526_)

    at
    
org.apache.flink.core.fs.FileSystem.instantiateHadoopFileSystemWrapper(_FileSystem.java:334_)

    at
    
org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(_FileSystem.java:358_)

    at org.apache.flink.core.fs.FileSystem.get(_FileSystem.java:280_)

    at
    
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(_ContinuousFileMonitoringFunction.java:120_)

    at
    
org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:80_)

    at
    
org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:53_)

    at
    
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(_SourceStreamTask.java:56_)

    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(_StreamTask.java:266_)

    at org.apache.flink.runtime.taskmanager.Task.run(_Task.java:584_)

    at java.lang.Thread.run(_Thread.java:745_)

    *From:*Chesnay Schepler [mailto:ches...@apache.org
    <mailto:ches...@apache.org>]
    *Sent:* Thursday, October 20, 2016 2:22 PM
    *To:* user@flink.apache.org <mailto:user@flink.apache.org>
    *Subject:* Re: org.apache.flink.core.fs.Path error?

    Hello Radu,

    Flink can handle windows paths, this alone can't be the problem.
    If you could post the error you are getting we may pinpoint the issue,
    but right now i would suggest the usual: check that the path is
    indeed correct, that you have sufficient permissions to access the
    file.

    And yes, you can report problems here ;)

    Regards,
    Chesnay

    On 20.10.2016 13:17, Radu Tudoran wrote:

        Hi,

        I am running a program that is suppose to read a CSV file from
        the local disk (I am still using Flink 1.1..i did not check if
        the situation is the same for 1.2). I am currently running the
        test on a windows OS.

        I am creating the path to the file e.g. “D:\\dir\\myfile.csv”

        However, I see that the CSV reader converts this to a Path
        object from flink core

        “val inputFormat = new TupleCsvInputFormat(new Path(path),
        rowDelim, fieldDelim, typeInfo)”  In CSVTableSource

        This ends up representing the initial path as an URI and
        changes \ to / resulting in ““D:/dir/myfile.csv””. The problem
        is that this is never changed when the file is actually open
        and accessed which leads to an error.

        …not sure if signaling this error here is the best place or if
        I should have used some other media..

        Best regards,

        Dr. Radu Tudoran

        Senior Research Engineer - Big Data Expert

        IT R&D Division

        cid:image007.jpg@01CD52EB.AD060EE0

        HUAWEI TECHNOLOGIES Duesseldorf GmbH

        European Research Center

        Riesstrasse 25, 80992 München

        E-mail: _radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com>_

        Mobile: +49 15209084330 <tel:%2B49%2015209084330>

        Telephone: +49 891588344173 <tel:%2B49%20891588344173>

        HUAWEI TECHNOLOGIES Duesseldorf GmbH
        Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
        <http://www.huawei.com>
        Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
        56063,
        Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
        Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
        56063,
        Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

        This e-mail and its attachments contain confidential
        information from HUAWEI, which is intended only for the person
        or entity whose address is listed above. Any use of the
        information contained herein in any way (including, but not
        limited to, total or partial disclosure, reproduction, or
        dissemination) by persons other than the intended recipient(s)
        is prohibited. If you receive this e-mail in error, please
        notify the sender by phone or email immediately and delete it!


Reply via email to