Hi Mike, For using the DistributedCache approach, you need to have HDFS or another distributed FS available to distribute the files.
I would actually like to understand why you said " then this file is copied to the yarn cluster and added to JVM class [...] but is ignored by TM JVM as it is neither jar(zip) file nor directory..." I believe you should be able to load non-class files through the classloader as well. Did you see any code that excludes non-class files? Afaik the Taskmanagers have access to all files (of any type) that are passed using the --ship command (or in the lib/ folder). On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin <m.prya...@gmail.com> wrote: > Hi Nico, > > Thanks a lot for you help, but unfortunately, the workaround you suggested > doesn't work for me. > I tried to leverage the StreamExecutionEnvironment#registerCachedFile > method but failed because this instance is created when the application > master has already been started therefore the classpath to run the > application somewhere on YARN cluster has already been created by means of > org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to > pass a local folder at the moment I submit the application so that it is > included in the application YARN classpath. > The option you suggested works well if I need to cache a file that is > available for me at the moment I want to register it (for example a file on > HDFS). > > Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to > pass user-specified folders to the YARN application classpath? > > > > Kind Regards, > Mike Pryakhin > > > > On 21 Jun 2017, at 16:55, Mikhail Pryakhin <m.prya...@gmail.com> wrote: > > Hi Nico! > Sounds great, will give it a try and return back with results soon. > > Thank you so much for your help!! > > Kind Regards, > Mike Pryakhin > > On 21 Jun 2017, at 16:36, Nico Kruber <n...@data-artisans.com> wrote: > > A workaround may be to use the DistributedCache. It apparently is not > documented much but the JavaDoc mentions roughly how to use it: > > https://github.com/apache/flink/blob/master/flink-java/ > src/main/java/org/apache/ > flink/api/java/ExecutionEnvironment.java#L954 > > /** > * Registers a file at the distributed cache under the given name. The file > will > be accessible > * from any user-defined function in the (distributed) runtime under a > local > path. Files > * may be local files (as long as all relevant workers have access to it), > or > files in a distributed file system. > * The runtime will copy the files temporarily to a local cache, if needed. > * <p> > * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be > obtained inside UDFs via > * {@link > org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} > and > provides access > * {@link org.apache.flink.api.common.cache.DistributedCache} via > * {@link > org.apache.flink.api.common.functions.RuntimeContext# > getDistributedCache()}. > * > * @param filePath The path of the file, as a URI (e.g. "file:///some/path" > or > "hdfs://host:port/and/path") > * @param name The name under which the file is registered. > */ > public void registerCachedFile(String filePath, String name){ > registerCachedFile(filePath, name, false); > } > > You could pass the actual file URL to use for each instance of your job > that > requires a different file via a simple job parameter: > > > public static void main(String[] args) throws Exception { > ParameterTool params = ParameterTool.fromArgs(args); > > ... > > env.registerCachedFile(params.get("config_file", <default/path>), > "extConfig"); > > ... > } > > Flink's DistributedCache will then cache the file locally and you can use > it in > a RichFunction like in > https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/ > apache/flink/test/distributedCache/DistributedCacheTest.java#L99 > > public class MyFunction extends AbstractRichFunction { > private static final long serialVersionUID = 1L; > > @Override > public void open(Configuration conf) throws IOException { > File file = > getRuntimeContext().getDistributedCache().getFile("extConfig"); > ... > } > } > > > Nico > > On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote: > > Hi guys, > > any news? > I’ve created a jira-ticket https://issues.apache.org/ > jira/browse/FLINK-6949 > <https://issues.apache.org/jira/browse/FLINK-6949>. > > > Kind Regards, > Mike Pryakhin > > On 16 Jun 2017, at 16:35, Mikhail Pryakhin <m.prya...@gmail.com> wrote: > > Hi all, > > I run my flink job on yarn cluster and need to supply job configuration > parameters via configuration file alongside with the job jar. > (configuration file can't be packaged into jobs jar file). I tried to put > the configuration file into the folder that is passed via --yarnship > option to the flink run command, then this file is copied to the yarn > cluster and added to JVM class path like 'path/application.conf' but is > ignored by TM JVM as it is neither jar(zip) file nor directory... > > A looked through the YarnClusterDescriptor class where the > ENV_FLINK_CLASSPATH is built and haven't found any option to to tell > flink (YarnClusterDescriptor especially) to add my configuration file to > the TM JVM classpath... Is there any way to do so? If not do you consider > to have such an ability to add files? (like in spark I just can pass any > files via --files option) > > Thanks in advance. > > Kind Regards, > Mike Pryakhin > > > > >