Hi Mike,

For using the DistributedCache approach, you need to have HDFS or another
distributed FS available to distribute the files.

I would actually like to understand why you said " then this file is copied
to the yarn cluster and added to JVM class  [...] but is ignored by TM JVM
as it is neither jar(zip) file nor directory..."
I believe you should be able to load non-class files through the
classloader as well.
Did you see any code that excludes non-class files? Afaik the Taskmanagers
have access to all files (of any type) that are passed using the --ship
command (or in the lib/ folder).


On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin <m.prya...@gmail.com>
wrote:

> Hi Nico,
>
> Thanks a lot for you help, but unfortunately, the workaround you suggested
> doesn't work for me.
> I tried to leverage the StreamExecutionEnvironment#registerCachedFile
> method but failed because this instance is created when the application
> master has already been started therefore the classpath to run the
> application somewhere on YARN cluster has already been created by means of
> org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to
> pass a local folder at the moment I submit the application so that it is
> included in the application YARN classpath.
> The option you suggested works well if I need to cache a file that is
> available for me at the moment I want to register it (for example a file on
> HDFS).
>
> Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to
> pass user-specified folders to the YARN application classpath?
>
>
>
> Kind Regards,
> Mike Pryakhin
>
>
>
> On 21 Jun 2017, at 16:55, Mikhail Pryakhin <m.prya...@gmail.com> wrote:
>
> Hi Nico!
> Sounds great, will give it a try and return back with results soon.
>
> Thank you so much for your help!!
>
> Kind Regards,
> Mike Pryakhin
>
> On 21 Jun 2017, at 16:36, Nico Kruber <n...@data-artisans.com> wrote:
>
> A workaround may be to use the DistributedCache. It apparently is not
> documented much but the JavaDoc mentions roughly how to use it:
>
> https://github.com/apache/flink/blob/master/flink-java/
> src/main/java/org/apache/
> flink/api/java/ExecutionEnvironment.java#L954
>
> /**
> * Registers a file at the distributed cache under the given name. The file
> will
> be accessible
> * from any user-defined function in the (distributed) runtime under a
> local
> path. Files
> * may be local files (as long as all relevant workers have access to it),
> or
> files in a distributed file system.
> * The runtime will copy the files temporarily to a local cache, if needed.
> * <p>
> * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be
> obtained inside UDFs via
> * {@link
> org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()}
> and
> provides access
> * {@link org.apache.flink.api.common.cache.DistributedCache} via
> * {@link
> org.apache.flink.api.common.functions.RuntimeContext#
> getDistributedCache()}.
> *
> * @param filePath The path of the file, as a URI (e.g. "file:///some/path"
> or
> "hdfs://host:port/and/path")
> * @param name The name under which the file is registered.
> */
> public void registerCachedFile(String filePath, String name){
> registerCachedFile(filePath, name, false);
> }
>
> You could pass the actual file URL to use for each instance of your job
> that
> requires a different file via a simple job parameter:
>
>
> public static void main(String[] args) throws Exception {
> ParameterTool params = ParameterTool.fromArgs(args);
>
> ...
>
> env.registerCachedFile(params.get("config_file", <default/path>),
> "extConfig");
>
> ...
> }
>
> Flink's DistributedCache will then cache the file locally and you can use
> it in
> a RichFunction like in
> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
> apache/flink/test/distributedCache/DistributedCacheTest.java#L99
>
> public class MyFunction extends AbstractRichFunction {
> private static final long serialVersionUID = 1L;
>
> @Override
> public void open(Configuration conf) throws IOException {
> File file =
> getRuntimeContext().getDistributedCache().getFile("extConfig");
> ...
> }
> }
>
>
> Nico
>
> On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
>
> Hi guys,
>
> any news?
> I’ve created a jira-ticket https://issues.apache.org/
> jira/browse/FLINK-6949
> <https://issues.apache.org/jira/browse/FLINK-6949>.
>
>
> Kind Regards,
> Mike Pryakhin
>
> On 16 Jun 2017, at 16:35, Mikhail Pryakhin <m.prya...@gmail.com> wrote:
>
> Hi all,
>
> I run my flink job on yarn cluster and need to supply job configuration
> parameters via configuration file alongside with the job jar.
> (configuration file can't be packaged into jobs jar file). I tried to put
> the configuration file into the folder that is passed via --yarnship
> option to the flink run command, then this file is copied to the yarn
> cluster and added to JVM class path like 'path/application.conf' but is
> ignored by TM JVM as it is neither jar(zip) file nor directory...
>
> A looked through the YarnClusterDescriptor class where the
> ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
> flink (YarnClusterDescriptor especially) to add my configuration file to
> the TM JVM classpath... Is there any way to do so? If not do you consider
> to have such an ability to add files? (like in spark I just can pass any
> files via --files option)
>
> Thanks in advance.
>
> Kind Regards,
> Mike Pryakhin
>
>
>
>
>

Reply via email to