Re: flink 1.9.1 guava version conflict

2019-11-18 Thread tison
可以直接使用 flink-cep 模块吗?

如果是自己定制的 flink lib,可以提供更详细的打包过程和作出的改动吗?

这里就是不同的 guava 版本没 shaded 好导致 classloader 解析的时候出现冲突,这个问题是 adhoc
的,需要进一步的了解你【我是下载了源码之后,自己编译了flink
cep相关的jar然后引入进来。】这个过程是怎么做的。

Best,
tison.


孙森  于2019年11月19日周二 下午2:23写道:

> 补充
>
> 我是下载了源码之后,自己编译了flink
> cep相关的jar然后引入进来。看pom文件flink-shaded-guava是flink-core引入的。
>
> > 2019年11月19日 下午2:20,孙森  写道:
> >
> > 我在项目中使用flink release-1.9.1,一直出现Caused by:
> java.lang.NumberFormatException: Not a version: 9
> > java -version
> > java version "1.8.0_111"
> > Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
> > Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
> >
> > scala 2.11
> >
> > maven  3.6.1
> >
> > 具体的错误信息如下:
> > Exception in thread "main"
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.NumberFormatException: Not a version: 9
> >   at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> >   at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> >   at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> >   at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
> >   at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
> >   at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)
> >
> >   at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
> >   at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
> >   at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
> >   at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
> >   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
> >   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> >   at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> >
> >   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> >   at
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> >   at scala.App$$anonfun$main$1.apply(App.scala:76)
> >   at scala.App$$anonfun$main$1.apply(App.scala:76)
> >   at scala.collection.immutable.List.foreach(List.scala:381)
> >   at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> >   at scala.App$class.main(App.scala:76)
> >
> > Caused by: java.lang.NumberFormatException: Not a version: 9
> >   at scala.util.PropertiesTrait$class.parts$1(Properties.scala:184)
> >   at
> scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:187)
> >   at scala.util.Properties$.isJavaAtLeast(Properties.scala:17)
> >   at
> scala.tools.util.PathResolverBase$Calculated$.javaBootClasspath(PathResolver.scala:276)
> >   at
> scala.tools.util.PathResolverBase$Calculated$.basis(PathResolver.scala:283)
> >   at
> scala.tools.util.PathResolverBase$Calculated$.containers$lzycompute(PathResolver.scala:293)
> >   at
> scala.tools.util.PathResolverBase$Calculated$.containers(PathResolver.scala:293)
> >   at
> scala.tools.util.PathResolverBase.containers(PathResolver.scala:309)
> >   at
> scala.tools.util.PathResolver.computeResult(PathResolver.scala:341)
> >   at
> scala.tools.util.PathResolver.computeResult(PathResolver.scala:332)
> >   at scala.tools.util.PathResolverBase.result(PathResolver.scala:314)
> >   at
> scala.tools.nsc.backend.JavaPlatform$class.classPath(JavaPlatform.scala:28)
> >   at
> scala.tools.nsc.Global$GlobalPlatform.classPath(Global.scala:115)
> >   at
> scala.tools.nsc.Global.scala$tools$nsc$Global$$recursiveClassPath(Global.scala:131)
> >   at scala.tools.nsc.Global.classPath(Global.scala:128)
> >   at
> scala.tools.nsc.backend.jvm.BTypesFromSymbols.(BTypesFromSymbols.scala:39)
> >   at
> scala.tools.nsc.backend.jvm.BCodeIdiomatic.(BCodeIdiomatic.scala:24)
> >   at
> scala.tools.nsc.backend.jvm.BCodeHelpers.(BCodeHelpers.scala:23)
> >   at
> scala.tools.nsc.backend.jvm.BCodeSkelBuilder.(BCodeSkelBuilder.scala:25)
> >   at
> scala.tools.nsc.backend.jvm.BCodeBodyBuilder.(BCodeBodyBuilder.scala:25)
> >   at
> scala.tools.nsc.backend.jvm.BCodeSyncAndTry.(BCodeSyncAndTry.scala:21)
> >   at scala.tools.nsc.backend.jvm.GenBCode.(GenBCode.scala:47)
> >   at scala.tools.nsc.Global$genBCode$.(Global.scala:675)
> >   at 

Re: flink 1.9.1 guava version conflict

2019-11-18 Thread 孙森
补充

我是下载了源码之后,自己编译了flink cep相关的jar然后引入进来。看pom文件flink-shaded-guava是flink-core引入的。

> 2019年11月19日 下午2:20,孙森  写道:
> 
> 我在项目中使用flink release-1.9.1,一直出现Caused by: java.lang.NumberFormatException: 
> Not a version: 9
> java -version
> java version "1.8.0_111"
> Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
> 
> scala 2.11
> 
> maven  3.6.1
> 
> 具体的错误信息如下:
> Exception in thread "main" 
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.NumberFormatException: Not a version: 9
>   at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>   at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>   at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>   at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
>   at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
>   at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)
> 
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>   
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   
> Caused by: java.lang.NumberFormatException: Not a version: 9
>   at scala.util.PropertiesTrait$class.parts$1(Properties.scala:184)
>   at scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:187)
>   at scala.util.Properties$.isJavaAtLeast(Properties.scala:17)
>   at 
> scala.tools.util.PathResolverBase$Calculated$.javaBootClasspath(PathResolver.scala:276)
>   at 
> scala.tools.util.PathResolverBase$Calculated$.basis(PathResolver.scala:283)
>   at 
> scala.tools.util.PathResolverBase$Calculated$.containers$lzycompute(PathResolver.scala:293)
>   at 
> scala.tools.util.PathResolverBase$Calculated$.containers(PathResolver.scala:293)
>   at scala.tools.util.PathResolverBase.containers(PathResolver.scala:309)
>   at scala.tools.util.PathResolver.computeResult(PathResolver.scala:341)
>   at scala.tools.util.PathResolver.computeResult(PathResolver.scala:332)
>   at scala.tools.util.PathResolverBase.result(PathResolver.scala:314)
>   at 
> scala.tools.nsc.backend.JavaPlatform$class.classPath(JavaPlatform.scala:28)
>   at scala.tools.nsc.Global$GlobalPlatform.classPath(Global.scala:115)
>   at 
> scala.tools.nsc.Global.scala$tools$nsc$Global$$recursiveClassPath(Global.scala:131)
>   at scala.tools.nsc.Global.classPath(Global.scala:128)
>   at 
> scala.tools.nsc.backend.jvm.BTypesFromSymbols.(BTypesFromSymbols.scala:39)
>   at 
> scala.tools.nsc.backend.jvm.BCodeIdiomatic.(BCodeIdiomatic.scala:24)
>   at 
> scala.tools.nsc.backend.jvm.BCodeHelpers.(BCodeHelpers.scala:23)
>   at 
> scala.tools.nsc.backend.jvm.BCodeSkelBuilder.(BCodeSkelBuilder.scala:25)
>   at 
> scala.tools.nsc.backend.jvm.BCodeBodyBuilder.(BCodeBodyBuilder.scala:25)
>   at 
> scala.tools.nsc.backend.jvm.BCodeSyncAndTry.(BCodeSyncAndTry.scala:21)
>   at scala.tools.nsc.backend.jvm.GenBCode.(GenBCode.scala:47)
>   at scala.tools.nsc.Global$genBCode$.(Global.scala:675)
>   at scala.tools.nsc.Global.genBCode$lzycompute(Global.scala:671)
>   at scala.tools.nsc.Global.genBCode(Global.scala:671)
>   at 
> scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.serialVUID(GenASM.scala:1240)
>   at 
> scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1329)
>   at 
> 

flink 1.9.1 guava version conflict

2019-11-18 Thread 孙森
我在项目中使用flink release-1.9.1,一直出现Caused by: java.lang.NumberFormatException: Not 
a version: 9
 java -version
java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

scala 2.11

maven  3.6.1

具体的错误信息如下:
Exception in thread "main" 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.NumberFormatException: Not a version: 9
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)

at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)

Caused by: java.lang.NumberFormatException: Not a version: 9
at scala.util.PropertiesTrait$class.parts$1(Properties.scala:184)
at scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:187)
at scala.util.Properties$.isJavaAtLeast(Properties.scala:17)
at 
scala.tools.util.PathResolverBase$Calculated$.javaBootClasspath(PathResolver.scala:276)
at 
scala.tools.util.PathResolverBase$Calculated$.basis(PathResolver.scala:283)
at 
scala.tools.util.PathResolverBase$Calculated$.containers$lzycompute(PathResolver.scala:293)
at 
scala.tools.util.PathResolverBase$Calculated$.containers(PathResolver.scala:293)
at scala.tools.util.PathResolverBase.containers(PathResolver.scala:309)
at scala.tools.util.PathResolver.computeResult(PathResolver.scala:341)
at scala.tools.util.PathResolver.computeResult(PathResolver.scala:332)
at scala.tools.util.PathResolverBase.result(PathResolver.scala:314)
at 
scala.tools.nsc.backend.JavaPlatform$class.classPath(JavaPlatform.scala:28)
at scala.tools.nsc.Global$GlobalPlatform.classPath(Global.scala:115)
at 
scala.tools.nsc.Global.scala$tools$nsc$Global$$recursiveClassPath(Global.scala:131)
at scala.tools.nsc.Global.classPath(Global.scala:128)
at 
scala.tools.nsc.backend.jvm.BTypesFromSymbols.(BTypesFromSymbols.scala:39)
at 
scala.tools.nsc.backend.jvm.BCodeIdiomatic.(BCodeIdiomatic.scala:24)
at 
scala.tools.nsc.backend.jvm.BCodeHelpers.(BCodeHelpers.scala:23)
at 
scala.tools.nsc.backend.jvm.BCodeSkelBuilder.(BCodeSkelBuilder.scala:25)
at 
scala.tools.nsc.backend.jvm.BCodeBodyBuilder.(BCodeBodyBuilder.scala:25)
at 
scala.tools.nsc.backend.jvm.BCodeSyncAndTry.(BCodeSyncAndTry.scala:21)
at scala.tools.nsc.backend.jvm.GenBCode.(GenBCode.scala:47)
at scala.tools.nsc.Global$genBCode$.(Global.scala:675)
at scala.tools.nsc.Global.genBCode$lzycompute(Global.scala:671)
at scala.tools.nsc.Global.genBCode(Global.scala:671)
at 
scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.serialVUID(GenASM.scala:1240)
at 
scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1329)
at 
scala.tools.nsc.backend.jvm.GenASM$AsmPhase.emitFor$1(GenASM.scala:198)
at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:204)
at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1528)
at 

Re: [DISCUSS] Support configure remote flink jar

2019-11-18 Thread Thomas Weise
There is a related use case (not specific to HDFS) that I came across:

It would be nice if the jar upload endpoint could accept the URL of a jar
file as alternative to the jar file itself. Such URL could point to an
artifactory or distributed file system.

Thomas


On Mon, Nov 18, 2019 at 7:40 PM Yang Wang  wrote:

> Hi tison,
>
> Thanks for your starting this discussion.
> * For user customized flink-dist jar, it is an useful feature. Since it
> could avoid to upload the flink-dist jar
> every time. Especially in production environment, it could accelerate the
> submission process.
> * For the standard flink-dist jar, FLINK-13938[1] could solve
> the problem.Upload a official flink release
> binary to distributed storage(hdfs) first, and then all the submission
> could benefit from it. Users could
> also upload the customized flink-dist jar to accelerate their submission.
>
> If the flink-dist jar could be specified to a remote path, maybe the user
> jar have the same situation.
>
> [1]. https://issues.apache.org/jira/browse/FLINK-13938
>
> tison  于2019年11月19日周二 上午11:17写道:
>
> > Hi forks,
> >
> > Recently, our customers ask for a feature configuring remote flink jar.
> > I'd like to reach to you guys
> > to see whether or not it is a general need.
> >
> > ATM Flink only supports configures local file as flink jar via `-yj`
> > option. If we pass a HDFS file
> > path, due to implementation detail it will fail with
> > IllegalArgumentException. In the story we support
> > configure remote flink jar, this limitation is eliminated. We also make
> > use of YARN locality so that
> > reducing uploading overhead, instead, asking YARN to localize the jar on
> > AM container started.
> >
> > Besides, it possibly has overlap with FLINK-13938. I'd like to put the
> > discussion on our
> > mailing list first.
> >
> > Are you looking forward to such a feature?
> >
> > @Yang Wang: this feature is different from that we discussed offline, it
> > only focuses on flink jar, not
> > all ship files.
> >
> > Best,
> > tison.
> >
>


Re: how to setup a ha flink cluster on k8s?

2019-11-18 Thread Yang Wang
Hi Rock,

If you want to start a ha flink cluster on k8s, the simplest way is to use
ZK+HDFS/S3,
just as the ha configuration on Yarn. The zookeeper-operator could help the
start a zk
cluster.[1] Please share more information that why it could not work.

If you are using kubernetes per-job cluster, the job could be recovered
when the jm pod
crashed and restarted.[2] The savepoint could also be used to get better
recovery.

[1].https://github.com/pravega/zookeeper-operator
[2].
https://github.com/apache/flink/blob/release-1.9/flink-container/kubernetes/README.md#deploy-flink-job-cluster

vino yang  于2019年11月16日周六 下午5:00写道:

> Hi Rock,
>
> I searched by Google and found a blog[1] talk about how to config JM HA
> for Flink on k8s. Do not know whether it suitable for you or not. Please
> feel free to refer to it.
>
> Best,
> Vino
>
> [1]:
> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
>
> Rock  于2019年11月16日周六 上午11:02写道:
>
>> I'm trying to setup a flink cluster on k8s for production use.But the
>> setup here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html
>>   this
>> not ha , when job-manager down and rescheduled
>>
>> the metadata for running job is lost.
>>
>>
>>
>> I tried to use ha setup for zk
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html
>>  on
>> k8s , but can't get it right.
>>
>>
>>
>> Stroing  job's metadata on k8s using pvc or other external file
>> system should be  very easy.Is there a way to achieve it.
>>
>


Re: [DISCUSS] Support configure remote flink jar

2019-11-18 Thread Yang Wang
Hi tison,

Thanks for your starting this discussion.
* For user customized flink-dist jar, it is an useful feature. Since it
could avoid to upload the flink-dist jar
every time. Especially in production environment, it could accelerate the
submission process.
* For the standard flink-dist jar, FLINK-13938[1] could solve
the problem.Upload a official flink release
binary to distributed storage(hdfs) first, and then all the submission
could benefit from it. Users could
also upload the customized flink-dist jar to accelerate their submission.

If the flink-dist jar could be specified to a remote path, maybe the user
jar have the same situation.

[1]. https://issues.apache.org/jira/browse/FLINK-13938

tison  于2019年11月19日周二 上午11:17写道:

> Hi forks,
>
> Recently, our customers ask for a feature configuring remote flink jar.
> I'd like to reach to you guys
> to see whether or not it is a general need.
>
> ATM Flink only supports configures local file as flink jar via `-yj`
> option. If we pass a HDFS file
> path, due to implementation detail it will fail with
> IllegalArgumentException. In the story we support
> configure remote flink jar, this limitation is eliminated. We also make
> use of YARN locality so that
> reducing uploading overhead, instead, asking YARN to localize the jar on
> AM container started.
>
> Besides, it possibly has overlap with FLINK-13938. I'd like to put the
> discussion on our
> mailing list first.
>
> Are you looking forward to such a feature?
>
> @Yang Wang: this feature is different from that we discussed offline, it
> only focuses on flink jar, not
> all ship files.
>
> Best,
> tison.
>


[DISCUSS] Support configure remote flink jar

2019-11-18 Thread tison
Hi forks,

Recently, our customers ask for a feature configuring remote flink jar. I'd
like to reach to you guys
to see whether or not it is a general need.

ATM Flink only supports configures local file as flink jar via `-yj`
option. If we pass a HDFS file
path, due to implementation detail it will fail with
IllegalArgumentException. In the story we support
configure remote flink jar, this limitation is eliminated. We also make use
of YARN locality so that
reducing uploading overhead, instead, asking YARN to localize the jar on AM
container started.

Besides, it possibly has overlap with FLINK-13938. I'd like to put the
discussion on our
mailing list first.

Are you looking forward to such a feature?

@Yang Wang: this feature is different from that we discussed offline, it
only focuses on flink jar, not
all ship files.

Best,
tison.


Re: Collections as Flink job parameters

2019-11-18 Thread Zhu Zhu
Hi Протченко,

Yes you cannot get a Map argument from ParameterTool directly.
ParameterTool fetches and stores data in the form of string so it's not
feasible to support any types of configuration values which may be set by
users.

A workaround is to convert the map to a string in head and parse it later
in the main method.

Thanks,
Zhu Zhu

Протченко Алексей  于2019年11月19日周二 上午12:29写道:

>
> Hello all.
>
> I have a question about providing complex configuration to Flink job. We
> are working on some kind of platform for running used-defined packages
> which actually cantain the main business logic. All the parameters we are
> providing via command line and parse with ParameterTool. That’s ok until we
> have parameters of simple types like String, int etc. But the problem is
> that we need to add a Map of custom parameters for users to provide
> configuration variables specific for their code.
>
> Reading documentation and code of ParameterTool I do not see clear
> possibility to do it. Is using third-party arguments parser is the only
> option?
>
> Best regards,
> Alex
>
>
> --
> Алексей Протченко
>


Re: Flink configuration at runtime

2019-11-18 Thread vino yang
Hi Amran,

Change the config option at runtime? No, Flink does not support this
feature currently.

However, for Flink on Yarn job cluster mode, you can specify different
config options for different jobs via program or flink-conf.yaml(copy a new
flink binary package then change config file).

Best,
Vino

amran dean  于2019年11月19日周二 上午5:53写道:

> Is it possible to configure certain settings at runtime, on a per-job
> basis rather than globally within flink-conf.yaml?
>
> For example, I have a job where it's desirable to retain a large number of
> checkpoints via
> state.checkpoints.num-retained.
>
> The checkpoints are cheap, and it's low cost. For other jobs, I don't want
> such a large number.
>
>
>


Re: Flink configuration at runtime

2019-11-18 Thread Zhu Zhu
Hi Amran,

Some configs, including "state.checkpoints.num-retained", are cluster
configs that always apply to the entire Flink cluster.
An alternative is to use per-job mode if you are running Flink jobs on
k8s/docker or yarn. Thus to create a Flink cluster for a single job.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/docker.html#flink-job-cluster
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn

Thanks,
Zhu Zhu

amran dean  于2019年11月19日周二 上午5:53写道:

> Is it possible to configure certain settings at runtime, on a per-job
> basis rather than globally within flink-conf.yaml?
>
> For example, I have a job where it's desirable to retain a large number of
> checkpoints via
> state.checkpoints.num-retained.
>
> The checkpoints are cheap, and it's low cost. For other jobs, I don't want
> such a large number.
>
>
>


Flink configuration at runtime

2019-11-18 Thread amran dean
Is it possible to configure certain settings at runtime, on a per-job basis
rather than globally within flink-conf.yaml?

For example, I have a job where it's desirable to retain a large number of
checkpoints via
state.checkpoints.num-retained.

The checkpoints are cheap, and it's low cost. For other jobs, I don't want
such a large number.


RE: SQL for Avro GenericRecords on Parquet

2019-11-18 Thread Hanan Yehudai
HI Peter.  Thanks.
This is my code .  I used one of the parquet / avro tests as a reference.

The code will fail on
Test testScan(ParquetTestCase) failed with:
java.lang.UnsupportedOperationException
   at 
org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)
   at 
org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)
   at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)


CODE :

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBuilderBase;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;

import org.apache.flink.formats.parquet.ParquetTableSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.Row;

import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.junit.Assert.assertEquals;

import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;


public class  ParquetTestCase extends MultipleProgramsTestBase {

private static String avroSchema = "{\n" +
"  \"name\": \"SimpleRecord\",\n" +
"  \"type\": \"record\",\n" +
"  \"fields\": [\n" +
"{ \"default\": null, \"name\": \"timestamp_edr\", \"type\": [ 
\"null\", \"long\" ]},\n" +
"{ \"default\": null, \"name\": \"id\", \"type\": [ \"null\", 
\"long\" ]},\n" +
"{ \"default\": null, \"name\": \"recordType_\", \"type\": [ 
\"null\", \"string\"]}\n" +
"  ],\n" +
"  \"schema_id\": 1,\n" +
"  \"type\": \"record\"\n" +
"}";

private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
private static Schema schm = new Schema.Parser().parse(avroSchema);
private static Path testPath;


public ParquetTestCase() {
super(TestExecutionMode.COLLECTION);
}


@BeforeClass
public static void setup() throws Exception {

GenericRecordBuilder genericRecordBuilder = new 
GenericRecordBuilder(schm);


List recs = new ArrayList<>();
for (int i = 0; i < 6; i++) {
GenericRecord gr = genericRecordBuilder.set("timestamp_edr", 
System.currentTimeMillis() / 1000).set("id", 333L).set("recordType_", 
"Type1").build();
recs.add(gr);
GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr", 
System.currentTimeMillis() / 1000).set("id", 22L).set("recordType_", 
"Type2").build();
recs.add(gr2);
}

testPath = new Path("/tmp",  UUID.randomUUID().toString());


ParquetWriter writer = 
AvroParquetWriter.builder(
new 
org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();

for (IndexedRecord record : recs) {
writer.write(record);
}
writer.close();
}


private ParquetTableSource createParquetTableSource(Path path) throws 
IOException {
MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);
ParquetTableSource parquetTableSource = ParquetTableSource.builder()
.path(path.getPath())
.forParquetSchema(nestedSchema)
.build();
return parquetTableSource;
}

@Test
public void testScan() throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment batchTableEnvironment  =
BatchTableEnvironment.create(env);
   

Re: RocksDB state on HDFS seems not being cleanned up

2019-11-18 Thread Yun Tang
Yes, state processor API cannot read window state now, here is the track of 
this issue [1]

[1] https://issues.apache.org/jira/browse/FLINK-13095

Best
Yun Tang


From: shuwen zhou 
Date: Monday, November 18, 2019 at 12:31 PM
To: user 
Subject: Fwd: RocksDB state on HDFS seems not being cleanned up

Forward to user group again since mail server was rejecting for last time
-- Forwarded message -
From: shuwen zhou mailto:jaco...@gmail.com>>
Date: Wed, 13 Nov 2019 at 13:33
Subject: Re: RocksDB state on HDFS seems not being cleanned up
To: Yun Tang mailto:myas...@live.com>>
Cc: user mailto:user@flink.apache.org>>

Hi Yun,
After my investigation, I found out the files are not orphan files, they are 
still being recorded in latest checkpoint's _metadata file.
I looked through the API you mentioned 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
 , seems like the state can be accessed is limited to user defined state. I am 
thinking that the outdated state might be belonged to a window reduce state, 
thus I would like to access window reduce state. Seems this API cannot provide 
such functionality, does it?

On Thu, 7 Nov 2019 at 18:08, Yun Tang 
mailto:myas...@live.com>> wrote:
Yes, just sum all file size within checkpoint meta to get the full checkpoint 
size (this would omit some byte stream state handles, but nearly accurate).

BTW, I think user-mail list is the better place for this email-thread, already 
sent this mail to user-mail list.

Best
Yun Tang

From: shuwen zhou mailto:jaco...@gmail.com>>
Date: Thursday, November 7, 2019 at 12:02 PM
To: Yun Tang mailto:myas...@live.com>>
Cc: dev mailto:d...@flink.apache.org>>, Till Rohrmann 
mailto:trohrm...@apache.org>>
Subject: Re: RocksDB state on HDFS seems not being cleanned up

Hi Yun,
Thank you for your detailed explanation,It brings me a lot to research. I think
1. I should try reduce number of "state.checkpoints.num-retained", maybe to 3, 
which could decrease amount of shared folder.
2. Does Flink 1.9.0 has the possibility of orphan files? Seems the answer is 
yes, maybe. I could have use the state process API you mentioned to figure it 
out and get back to you.
3. I have a look in file 
/flink/c344b61c456af743e4568a70b626837b/chk-172/_metadata, there are a lot file 
names like 
hdfs://hadoop/flink/c344b61c456af743e4568a70b626837b/shared/e9e10c8a-6d73-48e4-9e17-45838d276b03,
 sum those file's size up is the total size of each chekpoint, am I correct?
4. My checkpoint interval is 16 minutes.





On Wed, 6 Nov 2019 at 15:57, Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Shuwen

Since you just have 10 “chk-“ folders as expected and when subsuming 
checkpoints, the “chk-” folder would be removed after we successfully removed 
shared state [1]. That is to say, I think you might not have too many orphan 
states files left. To ensure this, you could use state process API [2] to load 
your checkpoints and compare all the files under “shared” folder to see whether 
there existed too many orphan files. If this is true, we might think of the 
custom compaction filter future of FRocksDB.

Secondly, your judgment of “20GB each checkpoint” might not be accurate when 
RocksDB incremental checkpoint is enabled, the UI showed is only the 
incremental size [3], I suggest you to count your files’s size within your 
checkpoint meta to know the accurate checkpoint size for each checkpoint.

Last but not least, RocksDB’s future of compaction filter to delete expired 
data only happened during compaction [4], I’m afraid you might need to look up 
your rocksDB’s LOG file to see the frequency of compaction on task managers. 
And I think the increasing size might be related with the interval of your 
checkpoints, what the interval when you executing checkpoints?


[1] 
https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
[3] 

Re: SQL for Avro GenericRecords on Parquet

2019-11-18 Thread Peter Huang
Hi Hanan,

Thanks for reporting the issue. Would you please attach your test code
here? I may help to investigate.



Best Regards
Peter Huang

On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai 
wrote:

> I have tried to persist Generic Avro records in a parquet file and then
> read it via ParquetTablesource – using SQL.
> Seems that the SQL I not executed properly !
>
> The persisted records are :
> Id  ,  type
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
>
> While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the
> above ( which is correct)
> Running  : "SELECT id  ,recordType_  FROM ParquetTable  where
> recordType_='Type1' "
> Will result in :
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
>
> As if the equal sign is assignment and not equal …
>
> am I doing something wrong ? is it an issue of Generic record vs
> SpecificRecords ?
>
>
>


Re: [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-18 Thread Oytun Tez
Congratulations! This is exciting.


 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oy...@motaword.com

  


On Mon, Nov 18, 2019 at 11:07 AM Robert Metzger  wrote:

> Hi all,
>
> I would like to announce that Ververica, with the permission of the Flink
> PMC, is launching a website called flink-packages.org. This goes back to
> an effort proposed earlier in 2019 [1]
> The idea of the site is to help developers building extensions /
> connectors / API etc. for Flink to get attention for their project.
> At the same time, we want to help Flink users to find those ecosystem
> projects, so that they can benefit from the work. A voting and commenting
> functionality allows users to rate and and discuss about individual
> packages.
>
> You can find the website here: https://flink-packages.org/
>
> The full announcement is available here:
> https://www.ververica.com/blog/announcing-flink-community-packages
>
> I'm happy to hear any feedback about the site.
>
> Best,
> Robert
>
>
> [1]
> https://lists.apache.org/thread.html/c306b8b6d5d2ca071071b634d647f47769760e1e91cd758f52a62c93@%3Cdev.flink.apache.org%3E
>


[ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-18 Thread Robert Metzger
Hi all,

I would like to announce that Ververica, with the permission of the Flink
PMC, is launching a website called flink-packages.org. This goes back to an
effort proposed earlier in 2019 [1]
The idea of the site is to help developers building extensions / connectors
/ API etc. for Flink to get attention for their project.
At the same time, we want to help Flink users to find those ecosystem
projects, so that they can benefit from the work. A voting and commenting
functionality allows users to rate and and discuss about individual
packages.

You can find the website here: https://flink-packages.org/

The full announcement is available here:
https://www.ververica.com/blog/announcing-flink-community-packages

I'm happy to hear any feedback about the site.

Best,
Robert


[1]
https://lists.apache.org/thread.html/c306b8b6d5d2ca071071b634d647f47769760e1e91cd758f52a62c93@%3Cdev.flink.apache.org%3E


Re: Apache Airflow - Question about checkpointing and re-run a job

2019-11-18 Thread M Singh
 Thanks Congxian for your answer and reference.  Mans
On Sunday, November 17, 2019, 08:59:16 PM EST, Congxian Qiu 
 wrote:  
 
 HiYes, checkpoint data locates under jobid dir. you can try to restore from 
the retained checkpoint[1][1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
Best,Congxian

M Singh  于2019年11月18日周一 上午2:54写道:

 Folks - Please let me know if you have any advice on this question.  Thanks
On Saturday, November 16, 2019, 02:39:18 PM EST, M Singh 
 wrote:  
 
 Hi:
I have a Flink job and sometimes I need to cancel and re run it.  From what I 
understand the checkpoints for a job are saved under the job id directory at 
the checkpoint location. If I run the same job again, it will get a new job id 
and the checkpoint saved from the previous run job (which is saved under the 
previous job's id dir) will not be used for this new run. Is that a correct 
understanding ?  If I need to re-run the job from the previous checkpoint - is 
there any way to do that automatically without using a savepoint ?
Also, I believe the internal job restarts do not change the job id so in those 
cases where the job restarts will pick the state from the saved checkpoint.  Is 
my understanding correct ?
Thanks
Mans  
  

SQL for Avro GenericRecords on Parquet

2019-11-18 Thread Hanan Yehudai
I have tried to persist Generic Avro records in a parquet file and then read it 
via ParquetTablesource – using SQL.
Seems that the SQL I not executed properly !

The persisted records are :
Id  ,  type
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2

While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the above ( 
which is correct)
Running  : "SELECT id  ,recordType_  FROM ParquetTable  where 
recordType_='Type1' "
Will result in :
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1

As if the equal sign is assignment and not equal …

am I doing something wrong ? is it an issue of Generic record vs 
SpecificRecords ?




Re: Keyed raw state - example

2019-11-18 Thread bastien dine
Hello Congxian,

Thanks for your response,
Don't you have an example with an Operator extending the
AbstractUdfStreamOperator?
Using the context.getRawKeyedStateInputs() (& output to snapshots)

TimeService is reimplementing the whole stuff :/

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le lun. 18 nov. 2019 à 03:19, Congxian Qiu  a
écrit :

> Hi
>Currently, I think you can ref the implementation of timerservice[1]
> which used the raw keyed state, the snapshot happens in
> AbstractStreamOperator#snapshotState(), for using Raw State you need to
> implement a new operator[2]. There is an issue wants to give some example
> for raw state[2]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#raw-and-managed-state
> [3] https://issues.apache.org/jira/browse/FLINK-14379
>
> Best,
> Congxian
>
>
> bastien dine  于2019年11月16日周六 上午5:57写道:
>
>> Hello everyone,
>>
>> I would like to know if anybody has a working example on how to declare a
>> keyed raw state ( in my case a keyedprocessoperator) and how to use  it in
>> my UDF (keyedprocessfunction)?
>>
>> Basicaly we have a huge problem with a ValueState w Rocksdb, getting
>> serialized for every element ( need to access it and update) so it's taking
>> a crazy amount of time and we would like to have it serialized only on
>> snapshot, so using Raw state is a possible good solution,
>> But i cannot find anyexample of it :/
>>
>> Thanks and best regards,
>>
>> Bastien DINE
>> Freelance
>> Data Architect / Software Engineer / Sysadmin
>> http://bastiendine.io
>>
>>
>>
>


Re: possible backwards compatibility issue between 1.8->1.9?

2019-11-18 Thread Tzu-Li (Gordon) Tai
Hi Bekir,

Before diving deeper, just to rule out the obvious:
Have you changed anything with the element type of the input stream to the
async wait operator?

This wasn't apparent from the information so far, so I want to quickly
clear that out of the way first.

Cheers,
Gordon

On Wed, Oct 30, 2019 at 11:52 PM Bekir Oguz  wrote:

> Hi guys,
> during our upgrade from 1.8.1 to 1.9.1, one of our jobs fail to start with
> the following exception. We deploy the job with 'allow-non-restored-state'
> option and from the latest checkpoint dir of the 1.8.1 version.
>
> org.apache.flink.util.StateMigrationException: The new state typeSerializer
> for operator state must not be incompatible.
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend
> .getListState(DefaultOperatorStateBackend.java:323)
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend
> .getListState(DefaultOperatorStateBackend.java:214)
> at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
> .initializeState(AsyncWaitOperator.java:272)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:281)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(
> StreamTask.java:881)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:395)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>
> We see from the Web UI that the 'async wait operator' is causing this,
> which is not changed at all during this upgrade.
>
> All other jobs are migrated without problems, only this one is failing. Has
> anyone else experienced this during migration?
>
> Regards,
> Bekir Oguz
>


回复: ./bin/yarn-session.sh 启动; Error while running the Flink Yarn session. 错误

2019-11-18 Thread 李军


ok ,我待会换个版本试试,我看了下这个启动时打印的信息: nable to load native-hadoop library for 
your platform. 
这个是不是也是有问题的;https://blog.csdn.net/qq_37518574/article/details/103125368 第一张图;
在2019年11月18日 16:55,tison 写道:
https://issues.apache.org/jira/browse/FLINK-10435

就这,如果你参数传错了 YARN 刚一 deploy 就挂了,就会出现跟这个 JIRA 描述一样的现象

Best,
tison.


tison  于2019年11月18日周一 下午4:53写道:

你的那个图可能是因为一些参数传得不对导致 YARN 部署的时候出问题了。我之前遇到过这种情况是因为我把一个参数传错了 YARN 的 APP
起来之后抛异常

你看到在那边不停的 retry 是一个已知的 BUG,我记得最近几个版本已经修了,我找找对应的 JIRA

Best,
tison.


李军  于2019年11月18日周一 下午4:50写道:

./bin/yarn-session.sh 启动; Error while running the Flink Yarn session. 错误
报错信息图:
https://blog.csdn.net/qq_37518574/article/details/103125368




Re: ./bin/yarn-session.sh 启动; Error while running the Flink Yarn session. 错误

2019-11-18 Thread tison
你的那个图可能是因为一些参数传得不对导致 YARN 部署的时候出问题了。我之前遇到过这种情况是因为我把一个参数传错了 YARN 的 APP
起来之后抛异常

你看到在那边不停的 retry 是一个已知的 BUG,我记得最近几个版本已经修了,我找找对应的 JIRA

Best,
tison.


李军  于2019年11月18日周一 下午4:50写道:

> ./bin/yarn-session.sh 启动; Error while running the Flink Yarn session. 错误
> 报错信息图: https://blog.csdn.net/qq_37518574/article/details/103125368
>
>


Re: yarn-session.sh 启动 报错

2019-11-18 Thread tison
你可以改一下 yarn-session.sh 的内容,看一下最后执行的命令是啥,可能 shell 里多了空格导致一些解析上的问题。

Best,
tison.


李军  于2019年11月18日周一 下午4:44写道:

> 找到问题了;
> 是我这个包:flink-shaded-hadoop-2-uber-2.8.3-7.0 (1)  名字错了;
> 但是又有一个错误不是很明白;
>
>
> 签名由 网易邮箱大师  定制
> 在2019年11月18日 16:38,李军  写道:
>
> HADOOP_CLASSPATH 设置了;
> 启动这个好像不需要指定程序的入口类吧
>
>
> 在2019年11月18日 16:34,tison  写道:
>
> 可能是你没有设置 HADOOP_CLASSPATH
>
> 参考 https://flink.apache.org/downloads.html 本页面最上部分
>
> If you plan to use Apache Flink together with Apache Hadoop (run Flink on
> YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file
> system connector) then select the download that bundles the matching Hadoop
> version, download the optional pre-bundled Hadoop that matches your version
> and place it in the lib folder of Flink, or export your HADOOP_CLASSPATH
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html
> >
>
> Best,
> tison.
>
>
> 李军  于2019年11月18日周一 下午4:31写道:
>
> 启动命令: ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024
> 报错:Error: Could not find or load main class
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
>
>
> ResourceManager,NodeManager都已启动。
>
>


./bin/yarn-session.sh 启动; Error while running the Flink Yarn session. 错误

2019-11-18 Thread 李军
./bin/yarn-session.sh 启动; Error while running the Flink Yarn session. 错误
报错信息图: https://blog.csdn.net/qq_37518574/article/details/103125368



回复: yarn-session.sh 启动 报错

2019-11-18 Thread 李军
找到问题了;
是我这个包:flink-shaded-hadoop-2-uber-2.8.3-7.0 (1)  名字错了;
但是又有一个错误不是很明白;




签名由网易邮箱大师定制
在2019年11月18日 16:38,李军 写道:
HADOOP_CLASSPATH 设置了;
启动这个好像不需要指定程序的入口类吧




在2019年11月18日 16:34,tison 写道:
可能是你没有设置 HADOOP_CLASSPATH

参考 https://flink.apache.org/downloads.html 本页面最上部分

If you plan to use Apache Flink together with Apache Hadoop (run Flink on
YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file
system connector) then select the download that bundles the matching Hadoop
version, download the optional pre-bundled Hadoop that matches your version
and place it in the lib folder of Flink, or export your HADOOP_CLASSPATH


Best,
tison.


李军  于2019年11月18日周一 下午4:31写道:

启动命令: ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024
报错:Error: Could not find or load main class
org.apache.flink.yarn.cli.FlinkYarnSessionCli


ResourceManager,NodeManager都已启动。


回复: yarn-session.sh 启动 报错

2019-11-18 Thread 李军
HADOOP_CLASSPATH 设置了;
启动这个好像不需要指定程序的入口类吧




在2019年11月18日 16:34,tison 写道:
可能是你没有设置 HADOOP_CLASSPATH

参考 https://flink.apache.org/downloads.html 本页面最上部分

If you plan to use Apache Flink together with Apache Hadoop (run Flink on
YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file
system connector) then select the download that bundles the matching Hadoop
version, download the optional pre-bundled Hadoop that matches your version
and place it in the lib folder of Flink, or export your HADOOP_CLASSPATH


Best,
tison.


李军  于2019年11月18日周一 下午4:31写道:

启动命令: ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024
报错:Error: Could not find or load main class
org.apache.flink.yarn.cli.FlinkYarnSessionCli


ResourceManager,NodeManager都已启动。


Re:yarn-session.sh 启动 报错

2019-11-18 Thread Henry

是不是程序里没有指定入口类啊?  在命令行里指定试试?  -C  com.xx.bbb 







在 2019-11-18 16:31:26,"李军"  写道:
>启动命令: ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024
>报错:Error: Could not find or load main class 
>org.apache.flink.yarn.cli.FlinkYarnSessionCli
>
>
>ResourceManager,NodeManager都已启动。


Re: yarn-session.sh 启动 报错

2019-11-18 Thread tison
可能是你没有设置 HADOOP_CLASSPATH

参考 https://flink.apache.org/downloads.html 本页面最上部分

If you plan to use Apache Flink together with Apache Hadoop (run Flink on
YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file
system connector) then select the download that bundles the matching Hadoop
version, download the optional pre-bundled Hadoop that matches your version
and place it in the lib folder of Flink, or export your HADOOP_CLASSPATH


Best,
tison.


李军  于2019年11月18日周一 下午4:31写道:

> 启动命令: ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024
> 报错:Error: Could not find or load main class
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
>
>
> ResourceManager,NodeManager都已启动。


yarn-session.sh 启动 报错

2019-11-18 Thread 李军
启动命令: ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024
报错:Error: Could not find or load main class 
org.apache.flink.yarn.cli.FlinkYarnSessionCli


ResourceManager,NodeManager都已启动。

Re: Broadcast checkpoint serialization fail

2019-11-18 Thread Vasily Melnik
Hi all,
We found the solution:
the problem is Comparator in TreeSet we used as the value of broadcast
state. Kryo is unable to serialize lambda in Comparator, so we changed to
regular class - and everything is fine now.


С уважением,
Василий Мельник

*Glow**Byte Consulting* 

===

Моб. тел.: +7 (903) 101-43-71
vasily.mel...@glowbyteconsulting.com


On Fri, 15 Nov 2019 at 14:29, Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi all.
> In Flink 1.8 we have strange exception that causes job failing:
>
> 2019-11-14 15:52:52,071 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- op4 (1/1)
> (797d4c2b85010dab6be5e1d06ff6493a) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 2 for operator op4 (1/1).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
> operator op4 (1/1).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.lang.NullPointerException
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
> at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:608)
> at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:605)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> at
> org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> ... 7 more
>
> As we see, exception occurs in  
> *org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
> *
> but what exactly is the reason?
>
> We configured RocksDB state backend for job with local filesystem storage.
>
>
> С уважением,
> Василий Мельник
>


Re:Re: 通过 yarn-cluster 提交多个任务,Flink 页面只显示最后一个提交的任务

2019-11-18 Thread Henry



终于搞明白了,太感谢啦





在 2019-11-18 16:09:32,"aihua li"  写道:
>这个跟每个job申请的资源有关,如果三个job申请的总资源数小于yarn总的资源,那么三个job是可以同时running,否则会有job因为资源申请不到而处于accepted状态
>资源足够的话两个job是可以调度到一台机器上的,只是端口号不同而已,可以从yarn页面上通过链接进入不同的页面查看
>
>> 在 2019年11月18日,下午3:54,Henry  写道:
>> 
>> 
>> 
>> 
>> 哦哦,太感谢了,我刚注意到链接是 http://slave1:41921/#/overview 。 
>> 但是有俩小疑问哈,第一个是,这个“41921”端口号是在哪里看到的呢? 
>> 第二个是,假如我只有slave1、slave2两台机器,但是我现在写好了三个Flink程序,然后
>> 要提交到 yarn 上运行,那么我是不是最多也就可以看到两个 running job ? 
>> 还是虽然其中两个程序虽然在同一个节点上,但是他们的端口号是不同的,可能是 slave1:1000 
>> 对应一个程序,slave2:2000对应一个程序,通过链接进入不同
>> 的页面查看?
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2019-11-18 13:44:09,"aihua li"  写道:
>>> 看了你的截图,链接中是id为0018的application,里面确实只有一个job,另一个job是在id为0019的application里,你变更下链接中的applicationId,应该就能看到另一个作业了
>>> 
 在 2019年11月17日,下午4:14,Henry  写道:
 
 
 
 是的,两个程序确实也都在运行,俩程序往数据库里写的数据是不同的,都在更新,所以运行木有问题。但是在 Flink Web 页面里的 Running 
 Job 中只有最后提交的那个
 程序,没有先提交的那个程序,确实有问题。您看下那个链接,是我截的图,谢谢。
 
 
 
 
 
 在 2019-11-17 16:05:29,18612537...@163.com 写道:
> 提交两个是两个进程,所以是两个job。我觉得没有任何问题,你点进去flink的页面肯定各是各的。
> 
> 发自我的iPhone
> 
>> 在 2019年11月17日,下午2:51,Henry  写道:
>> 
>> 
>> 大家好,我想请教一个问题哈。 就是我通过 ./flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 
>> 1024 xxx.jar 的方式提交了两个不同的任务程序,在yarn界面里也能看到两个正在运行的 app ,但是点击 对应的 
>> ApplicationMater 跳转到 Flink-Dashboard 页面之后,在 Running job 
>> 页面只看到一个运行的最后提交的程序呢? Task Slot 也只用了 1 个,这是啥情况呢? 也木有报错。yarn 页面显示两个任务都是 
>> Running 状态,而且先提交的哪个程序任务确实是在运行中的,因为数据库中的数据也是在增加呢。  谢谢大家。
>> 详情图片如下:
>> https://img-blog.csdnimg.cn/20191115175027258.png
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>> 
>


Re: 通过 yarn-cluster 提交多个任务,Flink 页面只显示最后一个提交的任务

2019-11-18 Thread aihua li
这个跟每个job申请的资源有关,如果三个job申请的总资源数小于yarn总的资源,那么三个job是可以同时running,否则会有job因为资源申请不到而处于accepted状态
资源足够的话两个job是可以调度到一台机器上的,只是端口号不同而已,可以从yarn页面上通过链接进入不同的页面查看

> 在 2019年11月18日,下午3:54,Henry  写道:
> 
> 
> 
> 
> 哦哦,太感谢了,我刚注意到链接是 http://slave1:41921/#/overview 。 
> 但是有俩小疑问哈,第一个是,这个“41921”端口号是在哪里看到的呢? 
> 第二个是,假如我只有slave1、slave2两台机器,但是我现在写好了三个Flink程序,然后
> 要提交到 yarn 上运行,那么我是不是最多也就可以看到两个 running job ? 
> 还是虽然其中两个程序虽然在同一个节点上,但是他们的端口号是不同的,可能是 slave1:1000 
> 对应一个程序,slave2:2000对应一个程序,通过链接进入不同
> 的页面查看?
> 
> 
> 
> 
> 
> 
> 在 2019-11-18 13:44:09,"aihua li"  写道:
>> 看了你的截图,链接中是id为0018的application,里面确实只有一个job,另一个job是在id为0019的application里,你变更下链接中的applicationId,应该就能看到另一个作业了
>> 
>>> 在 2019年11月17日,下午4:14,Henry  写道:
>>> 
>>> 
>>> 
>>> 是的,两个程序确实也都在运行,俩程序往数据库里写的数据是不同的,都在更新,所以运行木有问题。但是在 Flink Web 页面里的 Running 
>>> Job 中只有最后提交的那个
>>> 程序,没有先提交的那个程序,确实有问题。您看下那个链接,是我截的图,谢谢。
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2019-11-17 16:05:29,18612537...@163.com 写道:
 提交两个是两个进程,所以是两个job。我觉得没有任何问题,你点进去flink的页面肯定各是各的。
 
 发自我的iPhone
 
> 在 2019年11月17日,下午2:51,Henry  写道:
> 
> 
> 大家好,我想请教一个问题哈。 就是我通过 ./flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 
> 1024 xxx.jar 的方式提交了两个不同的任务程序,在yarn界面里也能看到两个正在运行的 app ,但是点击 对应的 
> ApplicationMater 跳转到 Flink-Dashboard 页面之后,在 Running job 
> 页面只看到一个运行的最后提交的程序呢? Task Slot 也只用了 1 个,这是啥情况呢? 也木有报错。yarn 页面显示两个任务都是 
> Running 状态,而且先提交的哪个程序任务确实是在运行中的,因为数据库中的数据也是在增加呢。  谢谢大家。
> 详情图片如下:
> https://img-blog.csdnimg.cn/20191115175027258.png
> 
> 
> 
> 
> 
> 
> 
> 
> 
>>