Re: Error while reading from hadoop sequence file

2018-12-11 Thread Akshay Mendole
Hi Stefen,
You are correct. I logged the error messages incorrectly in
my previous mail.

When I execute this code snippet

DataSource> input =
env.createInput(HadoopInputs.readSequenceFile(Text.class, Text.class,
ravenDataDir));

I got this error

The type returned by the input format could not be automatically
determined. Please specify the TypeInformation of the produced type
explicitly by using the 'createInput(InputFormat, TypeInformation)'
method instead.

org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551)
flipkart.EnrichementFlink.main(EnrichementFlink.java:31)


When I gave TypeInfomation manually,

DataSource> input =
env.createInput(HadoopInputs.readSequenceFile(Text.class, Text.class,
ravenDataDir),
TypeInformation.of(new TypeHint>() {
}));

I started getting this error message
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:290)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: java.lang.RuntimeException: Could not load the TypeInformation
for the class 'org.apache.hadoop.io.Writable'. You may be missing the
'flink-hadoop-compatibility' dependency.
at
org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2082)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1701)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1643)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:921)


When I copied flink-hadoop-compatibility_2.11-1.7.0.jar to flink lib
directory and executed,
I got this error message
java.lang.NoClassDefFoundError:
org/apache/flink/api/common/typeutils/TypeSerializerSnapshot
at
org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399)
at
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379)
at
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)

Let me know if you need more information.

Thanks,
Akshay


On Tue, Dec 11, 2018 at 4:45 PM Stefan Richter 
wrote:

> Hi,
>
> I am a bit confused by the explanation, the exception that you mentioned,
> is it happening in the first code snippet ( with the TypeInformation.of(…))
> or the second one? From looking into the code, I would expect the exception
> can only happen in the second snippet (without TypeInformation) but I am
> also wondering what the exception is for the first snippet then, because
> from the code I think the exception cannot be the same but something
> different, see:
>
>
> https://github.com/apache/flink/blob/70b2029f8a3d4ca2d3cb7bd7fddac9bb5b3e8f07/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L551
>
> Vs
>
>
> https://github.com/apache/flink/blob/70b2029f8a3d4ca2d3cb7bd7fddac9bb5b3e8f07/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L577
>
> Can you please clarify? I would expect that it should work once you call
> the 

Re: Singleton in a taskmanager

2018-12-11 Thread bupt_ljy
Hi Chen,
  They will not be sharing the same singleton. Firstly, the class is referenced 
by its classloader. And the classloader is bound to task. Therefore, different 
job’s slots have different classloaders, which means the different task’s 
class's references are different.
  Please correct me if I’m wrong.


Best,
Jiayi Liao


Original Message
Sender:burgesschentchen...@bloomberg.net
Recipient:useru...@flink.apache.org
Date:Wednesday, Dec 12, 2018 08:10
Subject:Singleton in a taskmanager


Hi Guys, I am running into a problem. I have 2 jobs running on the same 
taskmanager. Each Job creates a singleton of the same class, say MySingleton 
class. Are they actually sharing the same singleton? Hope my question is clear. 
Best, Burgess Chen -- Sent from: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Cannot configure akka.ask.timeout

2018-12-11 Thread qi luo
Hi Alex and Lukas,

This error is controlled by another RPC timeout (which is hard coded and not 
affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose 
a fix on that?

Cheers,
Qi

> On Dec 12, 2018, at 7:07 AM, Alex Vinnik  wrote:
> 
> Hi there,
> 
> Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .
> 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher#202546747]] after [1 ms]. 
> Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> 
> akka.ask.timeout: 600s
> 
> But looks like it is not honored. Any suggestions what can be done.
> 
> Thanks
> 
> On 2018/07/13 10:24:16, Lukas Kircher  > wrote: 
> > Hello,> 
> > 
> > I have problems setting configuration parameters for Akka in Flink 1.5.0. 
> > When I run a job I get the exception listed below which states that Akka 
> > timed out after 1ms. I tried to increase the timeout by following the 
> > Flink configuration documentation. Specifically I did the following:> 
> > 
> > 1) Passed a configuration to the Flink execution environment with 
> > `akka.ask.timeout` set to a higher value. I started this in Intellij.> 
> > 2) Passed program arguments via the run configuration in Intellij, e.g. 
> > `-Dakka.ask.timeout:100s`> 
> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local 
> > standalone cluster via start-cluster.sh. The setting is reflected in 
> > Flink's web interface.> 
> > 
> > However - despite explicit configuration the default setting seems to be 
> > used. The exception below states in each case that akka ask timed out after 
> > 1ms.> 
> > 
> > As my problem seems very basic I do not include an SSCCE for now but I can 
> > try to build one if this helps figuring out the issue.> 
> > 
> > --> 
> > [...]> 
> > Exception in thread "main" 
> > org.apache.flink.runtime.client.JobExecutionException: Could not retrieve 
> > JobResult.> 
> > [...]> 
> > at 
> > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
> >  
> > at 
> > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)> 
> > at 
> > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
> >  
> > at 
> > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
> >  
> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)> 
> > [...]> 
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> > [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
> >  after [1 ms]. Sender[null] sent message of type 
> > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".> 
> > at 
> > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)> 
> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)> 
> > at 
> > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
> >  
> > at 
> > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
> >  
> > at 
> > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
> >  
> > at 
> > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
> >  
> > at 
> > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
> >  
> > at 
> > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
> >  
> > at 
> > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
> >  
> > at java.lang.Thread.run(Thread.java:745)> 
> > [...]> 
> > --> 
> > 
> > 
> > Best regards and thanks for your help,> 
> > Lukas> 
> > 
> > 
> > 
> >



Singleton in a taskmanager

2018-12-11 Thread burgesschen
Hi Guys,

I am running into a problem. 

I have 2 jobs running on the same taskmanager. Each Job creates a singleton
of the same class, say MySingleton class. Are they actually sharing the same
singleton?

Hope my question is clear.

Best,
Burgess Chen



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Cannot configure akka.ask.timeout

2018-12-11 Thread Alex Vinnik
Hi there,

Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .

akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#202546747]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

akka.ask.timeout: 600s

But looks like it is not honored. Any suggestions what can be done.

Thanks

On 2018/07/13 10:24:16, Lukas Kircher  wrote:
> Hello,>
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0.
When I run a job I get the exception listed below which states that Akka
timed out after 1ms. I tried to increase the timeout by following the
Flink configuration documentation. Specifically I did the following:>
>
> 1) Passed a configuration to the Flink execution environment with
`akka.ask.timeout` set to a higher value. I started this in Intellij.>
> 2) Passed program arguments via the run configuration in Intellij, e.g.
`-Dakka.ask.timeout:100s`>
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local
standalone cluster via start-cluster.sh. The setting is reflected in
Flink's web interface.>
>
> However - despite explicit configuration the default setting seems to be
used. The exception below states in each case that akka ask timed out after
1ms.>
>
> As my problem seems very basic I do not include an SSCCE for now but I
can try to build one if this helps figuring out the issue.>
>
> -->
> [...]>
> Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
JobResult.>
> [...]>
> at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>

> at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>

> at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>

> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> [...]>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
after [1 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>

> at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>

> at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>

> at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>

> at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>

> at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>

> at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>

> at java.lang.Thread.run(Thread.java:745)>
> [...]>
> -->
>
>
> Best regards and thanks for your help,>
> Lukas>
>
>
>
>


Re: How many times Flink initialize an operator?

2018-12-11 Thread Ken Krugler
It’s based the parallelism of that operator, not the number of TaskManagers.

E.g. you can have an operator with a parallelism of one, and your cluster has 
10 TaskManagers, and you’ll only get a single instance of the operator.

— Ken


> On Dec 11, 2018, at 2:01 PM, Hao Sun  wrote:
> 
> I am using Flink 1.7 on K8S. This might does not matter :D.
> 
> I think Flink only initialize the MapFunction once per taskManager right?
> Because Flink will serialize the execution graph and distribute it to 
> taskManagers.
> 
> Or it creates a new MapFunction for every element?
> stream.map(new MapFunction[I,O]).addSink(discard)
> 
> Hao Sun
> Team Lead
> 1019 Market St. 7F
> San Francisco, CA 94103

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: How many times Flink initialize an operator?

2018-12-11 Thread Hao Sun
Ok, thanks for the clarification.
Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Tue, Dec 11, 2018 at 2:38 PM Ken Krugler 
wrote:

> It’s based the parallelism of that operator, not the number of
> TaskManagers.
>
> E.g. you can have an operator with a parallelism of one, and your cluster
> has 10 TaskManagers, and you’ll only get a single instance of the operator.
>
> — Ken
>
>
> On Dec 11, 2018, at 2:01 PM, Hao Sun  wrote:
>
> I am using Flink 1.7 on K8S. This might does not matter :D.
>
> I think Flink only initialize the MapFunction once per taskManager right?
> Because Flink will serialize the execution graph and distribute it to
> taskManagers.
>
> Or it creates a new MapFunction for every element?
> stream.map(new MapFunction[I,O]).addSink(discard)
>
> Hao Sun
> Team Lead
> 1019 Market St. 7F
> San Francisco, CA 94103
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> 
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


How many times Flink initialize an operator?

2018-12-11 Thread Hao Sun
I am using Flink 1.7 on K8S. This might does not matter :D.

I think Flink only initialize the MapFunction once per taskManager right?
Because Flink will serialize the execution graph and distribute it to
taskManagers.

Or it creates a new MapFunction for every element?
stream.map(new MapFunction[I,O]).addSink(discard)

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


Re: CodeCache is full - Issues with job deployments

2018-12-11 Thread PedroMrChaves
Hello Stefan,

Thank you for the reply.

I've taken a heap dump from a development cluster using jmap and analysed
it. To do the tests we restarted the cluster and then left a job running for
a few minutes. After that, we restarted the job a couple of times and
stopped it. After leaving the cluster with no running jobs for 20 min we
toke a heap dump.

We've found out that a thread which consumes data from kafka was still
running with a lot of finalizer calls as depicted bellow. 



 

I will deploy a job without a Kafka consumer to see if the code cache still
increases  (all of our cluster have problems with the code cache,
coincidentally all of the deployed jobs read from kafka).


Best Regards,
Pedro Chaves



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: sql program throw exception when new kafka with csv format

2018-12-11 Thread Timo Walther

Hi Marvin,

the CSV format is not supported for Kafka so far. Only formats that have 
the tag `DeserializationSchema` in the docs are supported.


Right now you have to implement you own DeserializationSchemaFactory or 
use JSON or Avro.


You can follow [1] to get informed once the CSV format is supported. I'm 
sure it will be merge for Flink 1.8.


Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-7050


Am 11.12.18 um 10:41 schrieb Marvin777:
Register kafka message source with csv format,  the error message is 
as follows:


Exception in thread "main"
org.apache.flink.table.api.NoMatchingTableFactoryException: Could
not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.


Reason: No context matches.


BTW, the flink version is 1.6.2 .

Thanks Marvin.

image.png





Re: sql program throw exception when new kafka with csv format

2018-12-11 Thread Hequn Cheng
Hi Marvin,

I had taken a look at the Flink code. It seems we can't use CSV format
for Kafka.
You can use JSON instead.
As the exception shows, Flink can't find a suitable
DeserializationSchemaFactory. Currently, only JSON and Avro support
DeserializationSchemaFactory.

Best, Hequn

On Tue, Dec 11, 2018 at 5:48 PM Marvin777 
wrote:

> Register kafka message source with csv format,  the error message is as
> follows:
>
> Exception in thread "main"
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
>
>
> Reason: No context matches.
>
>
> BTW, the flink version is 1.6.2 .
>
> Thanks Marvin.
>
> [image: image.png]
>
>


Re: CodeCache is full - Issues with job deployments

2018-12-11 Thread Stefan Richter
Hi,

in general, Flink uses user-code class loader for job specific code and the 
lifecycle of the class loader should end with the job. This usually means that 
job related code could be removed after the job is finished. However, objects 
of a class that was loaded by the user-code class loader should no longer be 
referenced from anywhere after the job finished or else the user-code class 
loader cannot be freed. If that is the case depends on the user code and the 
used dependencies, e.g. the user code might register some objects somewhere and 
does not remove them by the end of the job. This would prevent freeing the 
user-code and result in a leak. To figure out the root cause, you can take can 
analyse a heap dump for leaking class loaders, e.g. [1] and other sources on 
the web go deeper into this topic.

Best,
Stefan

[1] http://java.jiderhamn.se/category/classloader-leaks/ 


> On 11. Dec 2018, at 12:56, PedroMrChaves  wrote:
> 
> Hello,
> 
> Every time I deploy a flink job the code cache increases, which is expected.
> However, when I stop and start the job or it restarts the code cache
> continuous to increase.
> 
> Screenshot_2018-12-11_at_11.png
> 
>   
> 
> 
> I've added the flags "-XX:+PrintCompilation -XX:ReservedCodeCacheSize=350m
> -XX:-UseCodeCacheFlushing" to Flink taskmanagers and jobmanagers, but the
> cache doesn't decrease very much, as it is depicted in the screenshot above.
> Even if I stop all the jobs, the cache doesn't decrease. 
> 
> This gets to a point where I get the error "CodeCache is full. Compiler has
> been disabled".
> 
> I've attached the taskmanagers output with the "XX:+PrintCompilation" flag
> activated.
> 
> flink-flink-taskexecutor.out
> 
>   
> 
> Flink: 1.6.2
> Java:  openjdk version "1.8.0_191"
> 
> Best Regards,
> Pedro Chaves.
> 
> 
> 
> 
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



CodeCache is full - Issues with job deployments

2018-12-11 Thread PedroMrChaves
Hello,

Every time I deploy a flink job the code cache increases, which is expected.
However, when I stop and start the job or it restarts the code cache
continuous to increase.

Screenshot_2018-12-11_at_11.png

  


I've added the flags "-XX:+PrintCompilation -XX:ReservedCodeCacheSize=350m
-XX:-UseCodeCacheFlushing" to Flink taskmanagers and jobmanagers, but the
cache doesn't decrease very much, as it is depicted in the screenshot above.
Even if I stop all the jobs, the cache doesn't decrease. 

This gets to a point where I get the error "CodeCache is full. Compiler has
been disabled".

I've attached the taskmanagers output with the "XX:+PrintCompilation" flag
activated.

flink-flink-taskexecutor.out

  

Flink: 1.6.2
Java:  openjdk version "1.8.0_191"

Best Regards,
Pedro Chaves.




-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error while reading from hadoop sequence file

2018-12-11 Thread Stefan Richter
Hi,

I am a bit confused by the explanation, the exception that you mentioned, is it 
happening in the first code snippet ( with the TypeInformation.of(…)) or the 
second one? From looking into the code, I would expect the exception can only 
happen in the second snippet (without TypeInformation) but I am also wondering 
what the exception is for the first snippet then, because from the code I think 
the exception cannot be the same but something different, see:

https://github.com/apache/flink/blob/70b2029f8a3d4ca2d3cb7bd7fddac9bb5b3e8f07/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L551
 


Vs

https://github.com/apache/flink/blob/70b2029f8a3d4ca2d3cb7bd7fddac9bb5b3e8f07/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L577
 


Can you please clarify? I would expect that it should work once you call the 
method and provide the type info, or else what exactly is the exception there.

Best,
Stefan

> On 10. Dec 2018, at 13:35, Akshay Mendole  wrote:
> 
> Hi,
>I have been facing issues while trying to read from a hdfs sequence file.
> 
> This is my code snippet
> DataSource> input = env
> .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, 
> ravenDataDir),
> TypeInformation.of(new TypeHint>() {
> }));
> 
> Upon executing this in yarn cluster mode, I am getting following error
> The type returned by the input format could not be automatically determined. 
> Please specify the TypeInformation of the produced type explicitly by using 
> the 'createInput(InputFormat, TypeInformation)' method instead.
>   
> org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551)
>   flipkart.EnrichementFlink.main(EnrichementFlink.java:31)
> 
> 
> When I add the TypeInformation myself as follows, I run into the same issue.
> DataSource> input = env
> .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, 
> ravenDataDir));
> 
> 
> 
> When I add these libraries in the lib folder, 
> flink-hadoop-compatibility_2.11-1.7.0.jar
> 
> 
> the error changes to this
> 
> java.lang.NoClassDefFoundError: 
> org/apache/flink/api/common/typeutils/TypeSerializerSnapshot
>   at 
> org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111)
>   at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
>   at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399)
>   at 
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379)
>   at 
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> 
> 
> Can someone help me resolve this issue?
> 
> Thanks,
> Akshay
> 
> 
> 



Re: Very slow checkpoints occasionally occur

2018-12-11 Thread Stefan Richter
Hi,

Looking at the numbers, it seems to me that checkpoint execution (the times of 
the sync and async part) are always reasonable fast once they are executed on 
the task, but there are changes in the alignment time and the time from 
triggering a checkpoint to executing a checkpoint. As you are using windows and 
looking at the way the state size behaves before and after the problem, I might 
have a suggestion what could cause the problem. Before and during the 
problematic checkpoints, state size is rising. After the problem is gone, the 
state size is significantly smaller. Could it be that, as time progresses or 
jumps, there is a spike in session window triggering? When time moves it could 
be possible that suddenly a lot of windows are triggered and when a checkpoint 
barrier is arriving after the firing was triggered, it will have to wait until 
all window firing is completed for consistency reason. This would also explain 
the backpressure that you observe during this period, coming from a lot of / 
expensive window firing and future events/checkpoints can only proceed when the 
firing is done. You could investigate if that is what is happening and maybe 
take measure to avoid this, but that is highly dependent on your job logic.

Best,
Stefan 

> On 11. Dec 2018, at 10:26, Dongwon Kim  wrote:
> 
> Hi all,
> 
> We're facing the same problem mentioned in [1] - Very slow checkpoint 
> attempts of few tasks cause checkpoint failures and, furthermore, incur high 
> back pressure.
> We're running our Flink jobs on a cluster where
> - 2 masters + 8 worker nodes
> - all nodes, even masters, are equipped with SSD's
> - we have a separate cluster for Kafka
> - we depend largely on Hadoop-2.7.3; YARN for deploying per-job clusters and 
> HDFS for storing checkpoints and savepoints
> - All SSD's of each node serve as local-dirs for YARN NM and data-dirs for 
> HDFS DN
> - we use RocksDB state backend
> - we use the latest version, flink-1.7.0
> - we trigger checkpoints every 30 minutes and the size of state is not that 
> large as shown in the attached screenshot.
> 
> The job itself recovers from checkpoint failures and back pressure after a 
> while; [2] shows that the job recovers after three failed checkpoints.
> 
> Below is part of JM log message:
> 2018-12-10 17:24:36,150 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 14 @ 1544430276096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 17:24:57,912 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 14 for job e0cf3843cba85e8fdd5570ba18970d33 (43775252946 bytes in 
> 21781 ms).
> 2018-12-10 17:54:36,133 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 15 @ 1544432076096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 18:04:36,134 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 15 
> of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing.
> 2018-12-10 18:24:36,156 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 16 @ 1544433876096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 18:34:36,157 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 16 
> of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing.
> 2018-12-10 18:54:36,138 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 17 @ 1544435676096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:04:36,139 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 17 
> of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing.
> 2018-12-10 19:15:44,849 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 15 from 
> e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:16:37,822 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 16 from 
> e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:17:12,974 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 17 from 
> e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:24:36,147 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 18 @ 1544437476096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:32:05,869 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 18 for job e0cf3843cba85e8fdd5570ba18970d33 (52481833829 bytes in 
> 449738 ms).
> #15, #16, and #17 fail due to a single task e81a7fb90d05da2bcec02a34d6f821e3, 
> which is a 

sql program throw exception when new kafka with csv format

2018-12-11 Thread Marvin777
Register kafka message source with csv format,  the error message is as
follows:

Exception in thread "main"
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.


Reason: No context matches.


BTW, the flink version is 1.6.2 .

Thanks Marvin.

[image: image.png]


Re: After job cancel, leftover ZK state prevents job manager startup

2018-12-11 Thread Till Rohrmann
Hi Micah,

the problem looks indeed similar to FLINK-10255. Could you tell me your
exact cluster setup (HA with stand by JobManagers?). Moreover, the logs of
all JobManagers on DEBUG level would be helpful for further debugging.

Cheers,
Till

On Tue, Dec 11, 2018 at 10:09 AM Stefan Richter 
wrote:

> Hi,
>
> Thanks for reporting the problem, I think the exception trace looks indeed
> very similar to traces in the discussion for FLINK-10184. I will pull in
> Till who worked on the fix to hear his opinion. Maybe the current fix only
> made the problem less likely to appear but is not complete, yet?
>
> Best,
> Stefan
>
> > On 11. Dec 2018, at 05:19, Micah Wylde  wrote:
> >
> > Hello,
> >
> > We've been seeing an issue with several Flink 1.5.4 clusters that looks
> like this:
> >
> > 1. Job is cancelled with a savepoint
> > 2. The jar is deleted from our HA blobstore (S3)
> > 3. The jobgraph in ZK is *not* deleted
> > 4. We restart the cluster
> > 5. Startup fails in recovery because the jar is not available, with the
> stacktrace:
> >
> > 00:13:58.486 ERROR o.a.f.r.e.ClusterEntrypoint - Fatal error occurred in
> the cluster entrypoint.
> > {{ java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager}}
> > {{ at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)}}
> > {{ at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)}}
> > {{ at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)}}
> > {{ at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)}}
> > {{ at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
> > {{ at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
> > {{ at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
> > {{ at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}Caused
> by: java.lang.Exception: Cannot set up the user code libraries: No such
> file or directory:
> s3://streamingplatform-production/{JOB_NAME}/flink/highavailability/{JOB_NAME}/blob/job_5a3fe2c00c05efd3a552a1c6707d2c10/blob_p-6d585831f5c947335ac505b400cf8f3630cc706a-42355c2885b668b0bc5e15b856141b0
> >
> > This superficially seems similar to several issues that have apparently
> been fixed in 1.5.4, like FLINK-10255 and FLINK-10184.
> >
> > Has anybody else seen this issue on 1.5.4 (or later) clusters? Or any
> advice for debugging?
> >
> > Thanks,
> > Micah
>
>


Re: runtime.resourcemanager

2018-12-11 Thread Piotr Nowojski
Hey,

Is that whole Task Manager log? Have you checked memory issues both on Task 
Managers and the Job Manager? Like out of memory/long GC pauses as I suggested 
in the first email? 

After you rule memory issues, you could capture couple of thread dumps (`kill 
-3 JVM_PID` or `jstack JVM_PID`) and check if any thread is stuck in your code.

Another potential issue, are you sure that you have a healthy network between 
nodes? No packet losts, low ping etc?

Piotrek

> On 10 Dec 2018, at 17:44, Alieh  wrote:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> Hello,
> 
> this is the task manage log but it does not change after I run the program.  
> I think the Flink planner has problem with my program. It can not even start 
> the job.
> 
> Best,
> 
> Alieh
> 
> 
> 
> 018-12-10 12:20:20,386 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> 
> 2018-12-10 12:20:20,387 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Starting 
> TaskManager (Version: 1.6.0, Rev:ff472b4, Date:07.08.2018 @ 13:31:13 UTC)
> 2018-12-10 12:20:20,387 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  OS current 
> user: alieh
> 2018-12-10 12:20:20,609 WARN  org.apache.hadoop.util.NativeCodeLoader 
>   - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2018-12-10 12:20:20,768 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Current 
> Hadoop/Kerberos user: alieh
> 2018-12-10 12:20:20,769 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM: Java 
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.161-b12
> 2018-12-10 12:20:20,769 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Maximum heap 
> size: 922 MiBytes
> 2018-12-10 12:20:20,769 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JAVA_HOME: 
> /usr/lib/jvm/java-8-oracle
> 2018-12-10 12:20:20,774 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Hadoop 
> version: 2.4.1
> 2018-12-10 12:20:20,775 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM Options:
> 2018-12-10 12:20:20,775 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> -XX:+UseG1GC
> 2018-12-10 12:20:20,775 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xms922M
> 2018-12-10 12:20:20,775 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xmx922M
> 2018-12-10 12:20:20,775 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> -XX:MaxDirectMemorySize=8388607T
> 2018-12-10 12:20:20,775 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> -Dlog.file=/home/alieh/flink-1.6.0/log/flink-alieh-taskexecutor-0-alieh-P67A-D3-B3.log
> 2018-12-10 12:20:20,775 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> -Dlog4j.configuration=file:/home/alieh/flink-1.6.0/conf/log4j.properties 
> 
> 2018-12-10 12:20:20,775 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> -Dlogback.configurationFile=file:/home/alieh/flink-1.6.0/conf/logback.xml 
> 
> 2018-12-10 12:20:20,775 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Program 
> Arguments:
> 2018-12-10 12:20:20,776 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> --configDir
> 2018-12-10 12:20:20,776 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> /home/alieh/flink-1.6.0/conf
> 2018-12-10 12:20:20,776 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Classpath: 
> /home/alieh/flink-1.6.0/lib/flink-python_2.11-1.6.0.jar:/home/alieh/flink-1.6.0/lib/flink-shaded-hadoop2-uber-1.6.0.jar:/home/alieh/flink-1.6.0/lib/log4j-1.2.17.jar:/home/alieh/flink-1.6.0/lib/slf4j-log4j12-1.7.7.jar:/home/alieh/flink-1.6.0/lib/flink-dist_2.11-1.6.0.jar:::
> 2018-12-10 12:20:20,776 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> 
> 2018-12-10 12:20:20,777 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Registered 
> UNIX signal handlers for [TERM, HUP, INT]
> 2018-12-10 12:20:20,785 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Maximum 
> number of open file descriptors is 1048576.
> 2018-12-10 12:20:20,803 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-12-10 12:20:20,803 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-12-10 12:20:20,803 INFO  
> 

Re: After job cancel, leftover ZK state prevents job manager startup

2018-12-11 Thread Stefan Richter
Hi,

Thanks for reporting the problem, I think the exception trace looks indeed very 
similar to traces in the discussion for FLINK-10184. I will pull in Till who 
worked on the fix to hear his opinion. Maybe the current fix only made the 
problem less likely to appear but is not complete, yet?

Best,
Stefan

> On 11. Dec 2018, at 05:19, Micah Wylde  wrote:
> 
> Hello,
> 
> We've been seeing an issue with several Flink 1.5.4 clusters that looks like 
> this:
> 
> 1. Job is cancelled with a savepoint
> 2. The jar is deleted from our HA blobstore (S3)
> 3. The jobgraph in ZK is *not* deleted
> 4. We restart the cluster
> 5. Startup fails in recovery because the jar is not available, with the 
> stacktrace:
> 
> 00:13:58.486 ERROR o.a.f.r.e.ClusterEntrypoint - Fatal error occurred in the 
> cluster entrypoint.
> {{ java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobExecutionException: Could not set up 
> JobManager}}
> {{ at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)}}
> {{ at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)}}
> {{ at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)}}
> {{ at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)}}
> {{ at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
> {{ at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
> {{ at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
> {{ at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}Caused
>  by: java.lang.Exception: Cannot set up the user code libraries: No such file 
> or directory: 
> s3://streamingplatform-production/{JOB_NAME}/flink/highavailability/{JOB_NAME}/blob/job_5a3fe2c00c05efd3a552a1c6707d2c10/blob_p-6d585831f5c947335ac505b400cf8f3630cc706a-42355c2885b668b0bc5e15b856141b0
> 
> This superficially seems similar to several issues that have apparently been 
> fixed in 1.5.4, like FLINK-10255 and FLINK-10184.
> 
> Has anybody else seen this issue on 1.5.4 (or later) clusters? Or any advice 
> for debugging?
> 
> Thanks,
> Micah