[ https://issues.apache.org/jira/browse/FLINK-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dawid Wysakowicz updated FLINK-10370: ------------------------------------- Component/s: Cluster Management > DistributedCache does not work in job cluster mode. > --------------------------------------------------- > > Key: FLINK-10370 > URL: https://issues.apache.org/jira/browse/FLINK-10370 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Job-Submission > Affects Versions: 1.6.0 > Reporter: Dawid Wysakowicz > Priority: Major > > When using job cluster mode the client does not follow a standard submission > path during which {{DistributedCacheEntries}} are written into > {{Configuration}}. Therefore the files cannot be accessed in the job. > How to reproduce: > Simple job that uses {{DistributedCache}}: > {code} > public class DistributedCacheViaDfsTestProgram { > public static void main(String[] args) throws Exception { > final ParameterTool params = ParameterTool.fromArgs(args); > final String inputFile = > "hdfs://172.17.0.2:8020/home/hadoop-user/in"; > final String outputFile = "/tmp/out"; > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.registerCachedFile(inputFile, "test_data", false); > env.fromElements(1) > .map(new TestMapFunction()) > .writeAsText(outputFile, > FileSystem.WriteMode.OVERWRITE); > env.execute("Distributed Cache Via Blob Test Program"); > } > static class TestMapFunction extends RichMapFunction<Integer, String> { > @Override > public String map(Integer value) throws Exception { > final Path testFile = > getRuntimeContext().getDistributedCache().getFile("test_data").toPath(); > return Files.readAllLines(testFile) > .stream() > .collect(Collectors.joining("\n")); > } > } > } > {code} > If one runs this program e.g. in yarn job cluster mode this will produce: > {code} > java.lang.IllegalArgumentException: File with name 'test_data' is not > available. Did you forget to register the file? > at > org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:110) > at > org.apache.flink.streaming.tests.DistributedCacheViaDfsTestProgram$TestMapFunction.map(DistributedCacheViaDfsTestProgram.java:59) > at > org.apache.flink.streaming.tests.DistributedCacheViaDfsTestProgram$TestMapFunction.map(DistributedCacheViaDfsTestProgram.java:55) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:164) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > {code} > This job will run fine though, if it will be submitted to yarn-session > cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)